You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sr...@apache.org on 2013/04/26 17:50:51 UTC
[35/47] git commit: More check style fixes relating to oozie module
More check style fixes relating to oozie module
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/9b4d845f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/9b4d845f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/9b4d845f
Branch: refs/heads/master
Commit: 9b4d845f018c4f6a0a659cde63131ed08f0fa811
Parents: 223d8f0
Author: srikanth.sundarrajan <sr...@inmobi.com>
Authored: Wed Apr 17 16:47:08 2013 +0530
Committer: srikanth.sundarrajan <sr...@inmobi.com>
Committed: Wed Apr 17 16:47:08 2013 +0530
----------------------------------------------------------------------
.../converter/AbstractOozieEntityMapper.java | 42 ++-
.../java/org/apache/falcon/logging/LogMover.java | 32 ++-
.../org/apache/falcon/logging/LogProvider.java | 10 +-
.../apache/falcon/service/FalconPathFilter.java | 3 +
.../service/SharedLibraryHostingService.java | 12 +-
.../java/org/apache/falcon/util/OozieUtils.java | 5 +
.../falcon/workflow/FalconPostProcessing.java | 9 +-
.../falcon/workflow/OozieWorkflowBuilder.java | 8 +-
.../falcon/workflow/engine/NullBundleJob.java | 3 +
.../falcon/workflow/engine/NullCoordJob.java | 3 +
.../falcon/workflow/engine/OozieClientFactory.java | 27 +-
.../workflow/engine/OozieHouseKeepingService.java | 5 +-
.../workflow/engine/OozieWorkflowEngine.java | 259 ++++++++-------
.../org/apache/oozie/client/CustomOozieClient.java | 7 +-
.../oozie/bundle/BundleUnmarshallingTest.java | 3 +
.../coordinator/CoordinatorUnmarshallingTest.java | 2 +-
.../oozie/workflow/FalconPostProcessingTest.java | 9 +-
.../oozie/workflow/WorkflowUnmarshallingTest.java | 3 +
.../apache/oozie/client/CustomOozieClientTest.java | 3 +
19 files changed, 274 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9b4d845f/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java b/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
index ac3e76e..f3ddb99 100644
--- a/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
+++ b/oozie/src/main/java/org/apache/falcon/converter/AbstractOozieEntityMapper.java
@@ -52,9 +52,13 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+/**
+ * Entity mapper base class that allows an entity to be mapped to oozie bundle.
+ * @param <T>
+ */
public abstract class AbstractOozieEntityMapper<T extends Entity> {
- private static Logger LOG = Logger.getLogger(AbstractOozieEntityMapper.class);
+ private static final Logger LOG = Logger.getLogger(AbstractOozieEntityMapper.class);
protected static final String NOMINAL_TIME_EL = "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}";
@@ -63,11 +67,11 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
protected static final String MR_QUEUE_NAME = "queueName";
protected static final String MR_JOB_PRIORITY = "jobPriority";
- protected static final JAXBContext workflowJaxbContext;
- protected static final JAXBContext coordJaxbContext;
- protected static final JAXBContext bundleJaxbContext;
+ protected static final JAXBContext WORKFLOW_JAXB_CONTEXT;
+ protected static final JAXBContext COORD_JAXB_CONTEXT;
+ protected static final JAXBContext BUNDLE_JAXB_CONTEXT;
- protected static final FalconPathFilter falconJarFilter = new FalconPathFilter() {
+ protected static final FalconPathFilter FALCON_JAR_FILTER = new FalconPathFilter() {
@Override
public boolean accept(Path path) {
if (path.getName().startsWith("falcon")) {
@@ -88,9 +92,9 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
static {
try {
- workflowJaxbContext = JAXBContext.newInstance(WORKFLOWAPP.class);
- coordJaxbContext = JAXBContext.newInstance(COORDINATORAPP.class);
- bundleJaxbContext = JAXBContext.newInstance(BUNDLEAPP.class);
+ WORKFLOW_JAXB_CONTEXT = JAXBContext.newInstance(WORKFLOWAPP.class);
+ COORD_JAXB_CONTEXT = JAXBContext.newInstance(COORDINATORAPP.class);
+ BUNDLE_JAXB_CONTEXT = JAXBContext.newInstance(BUNDLEAPP.class);
} catch (JAXBException e) {
throw new RuntimeException("Unable to create JAXB context", e);
}
@@ -147,7 +151,7 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
fs.mkdirs(libPath);
}
- SharedLibraryHostingService.pushLibsToHDFS(libPath.toString(), cluster, falconJarFilter);
+ SharedLibraryHostingService.pushLibsToHDFS(libPath.toString(), cluster, FALCON_JAR_FILTER);
} catch (IOException e) {
LOG.error("Failed to copy shared libs on cluster " + cluster.getName(), e);
throw new FalconException("Failed to copy shared libs on cluster " + cluster.getName(), e);
@@ -158,7 +162,7 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
protected org.apache.falcon.oozie.coordinator.CONFIGURATION getCoordConfig(Map<String, String> propMap) {
org.apache.falcon.oozie.coordinator.CONFIGURATION conf
- = new org.apache.falcon.oozie.coordinator.CONFIGURATION();
+ = new org.apache.falcon.oozie.coordinator.CONFIGURATION();
List<org.apache.falcon.oozie.coordinator.CONFIGURATION.Property> props = conf.getProperty();
for (Entry<String, String> prop : propMap.entrySet()) {
props.add(createCoordProperty(prop.getKey(), prop.getValue()));
@@ -219,7 +223,7 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
protected org.apache.falcon.oozie.coordinator.CONFIGURATION.Property createCoordProperty(String name,
String value) {
org.apache.falcon.oozie.coordinator.CONFIGURATION.Property prop
- = new org.apache.falcon.oozie.coordinator.CONFIGURATION.Property();
+ = new org.apache.falcon.oozie.coordinator.CONFIGURATION.Property();
prop.setName(name);
prop.setValue(value);
return prop;
@@ -227,14 +231,15 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
protected org.apache.falcon.oozie.bundle.CONFIGURATION.Property createBundleProperty(String name, String value) {
org.apache.falcon.oozie.bundle.CONFIGURATION.Property prop
- = new org.apache.falcon.oozie.bundle.CONFIGURATION.Property();
+ = new org.apache.falcon.oozie.bundle.CONFIGURATION.Property();
prop.setName(name);
prop.setValue(value);
return prop;
}
protected void marshal(Cluster cluster, JAXBElement<?> jaxbElement, JAXBContext jaxbContext, Path outPath)
- throws FalconException {
+ throws FalconException {
+
try {
Marshaller marshaller = jaxbContext.createMarshaller();
marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
@@ -274,13 +279,14 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
name = "coordinator";
}
name = name + ".xml";
- marshal(cluster, new ObjectFactory().createCoordinatorApp(coord), coordJaxbContext, new Path(outPath, name));
+ marshal(cluster, new ObjectFactory().createCoordinatorApp(coord), COORD_JAXB_CONTEXT, new Path(outPath, name));
return name;
}
protected void marshal(Cluster cluster, BUNDLEAPP bundle, Path outPath) throws FalconException {
- marshal(cluster, new org.apache.falcon.oozie.bundle.ObjectFactory().createBundleApp(bundle), bundleJaxbContext,
+ marshal(cluster, new org.apache.falcon.oozie.bundle.ObjectFactory().createBundleApp(bundle),
+ BUNDLE_JAXB_CONTEXT,
new Path(
outPath, "bundle.xml"));
}
@@ -288,7 +294,7 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
protected void marshal(Cluster cluster, WORKFLOWAPP workflow, Path outPath) throws FalconException {
marshal(cluster, new org.apache.falcon.oozie.workflow.ObjectFactory().createWorkflowApp(workflow),
- workflowJaxbContext,
+ WORKFLOW_JAXB_CONTEXT,
new Path(outPath, "workflow.xml"));
}
@@ -310,7 +316,7 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
protected WORKFLOWAPP getWorkflowTemplate(String template) throws FalconException {
try {
- Unmarshaller unmarshaller = workflowJaxbContext.createUnmarshaller();
+ Unmarshaller unmarshaller = WORKFLOW_JAXB_CONTEXT.createUnmarshaller();
@SuppressWarnings("unchecked")
JAXBElement<WORKFLOWAPP> jaxbElement = (JAXBElement<WORKFLOWAPP>) unmarshaller.unmarshal(this.getClass()
.getResourceAsStream(template));
@@ -322,7 +328,7 @@ public abstract class AbstractOozieEntityMapper<T extends Entity> {
protected COORDINATORAPP getCoordinatorTemplate(String template) throws FalconException {
try {
- Unmarshaller unmarshaller = coordJaxbContext.createUnmarshaller();
+ Unmarshaller unmarshaller = COORD_JAXB_CONTEXT.createUnmarshaller();
@SuppressWarnings("unchecked")
JAXBElement<COORDINATORAPP> jaxbElement = (JAXBElement<COORDINATORAPP>) unmarshaller
.unmarshal(AbstractOozieEntityMapper.class.getResourceAsStream(template));
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9b4d845f/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/LogMover.java b/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
index 261fc53..a1b0c32 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/LogMover.java
@@ -17,13 +17,21 @@
*/
package org.apache.falcon.logging;
-import org.apache.commons.cli.*;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
@@ -40,17 +48,23 @@ import java.net.URL;
import java.net.URLConnection;
import java.util.List;
+/**
+ * Utitlity called in the post process of oozie workflow to move oozie action executor log.
+ */
public class LogMover extends Configured implements Tool {
private static final Logger LOG = Logger.getLogger(LogMover.class);
+ /**
+ * Args to the command.
+ */
private static class ARGS {
- String oozieUrl;
- String subflowId;
- String runId;
- String logDir;
- String status;
- String entityType;
+ private String oozieUrl;
+ private String subflowId;
+ private String runId;
+ private String logDir;
+ private String status;
+ private String entityType;
}
public static void main(String[] args) throws Exception {
@@ -63,7 +77,7 @@ public class LogMover extends Configured implements Tool {
ARGS args = new ARGS();
setupArgs(arguments, args);
OozieClient client = new OozieClient(args.oozieUrl);
- WorkflowJob jobInfo = null;
+ WorkflowJob jobInfo;
try {
jobInfo = client.getJobInfo(args.subflowId);
} catch (OozieClientException e) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9b4d845f/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java b/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
index 8eec0d4..48d4589 100644
--- a/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
+++ b/oozie/src/main/java/org/apache/falcon/logging/LogProvider.java
@@ -37,13 +37,16 @@ import org.mortbay.log.Log;
import java.io.IOException;
+/**
+ * Get oozie action execution logs corresponding to a run as saved by the log mover.
+ */
public final class LogProvider {
private static final Logger LOG = Logger.getLogger(LogProvider.class);
public Instance populateLogUrls(Entity entity, Instance instance,
String runId) throws FalconException {
- Cluster clusterObj = (Cluster) ConfigurationStore.get().get(
+ Cluster clusterObj = ConfigurationStore.get().get(
EntityType.CLUSTER, instance.cluster);
String resolvedRunId = "-";
try {
@@ -72,7 +75,8 @@ public final class LogProvider {
public String getResolvedRunId(FileSystem fs, Cluster cluster,
Entity entity, Instance instance, String runId)
- throws FalconException, IOException {
+ throws FalconException, IOException {
+
if (StringUtils.isEmpty(runId)) {
Path jobPath = new Path(ClusterHelper.getStorageUrl(cluster),
EntityUtil.getLogPath(cluster, entity) + "/job-"
@@ -104,7 +108,7 @@ public final class LogProvider {
private Instance populateActionLogUrls(FileSystem fs, Cluster cluster,
Entity entity, Instance instance, String formatedRunId)
- throws FalconException, OozieClientException, IOException {
+ throws FalconException, OozieClientException, IOException {
Path actionPaths = new Path(ClusterHelper.getStorageUrl(cluster),
EntityUtil.getLogPath(cluster, entity) + "/job-"
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9b4d845f/oozie/src/main/java/org/apache/falcon/service/FalconPathFilter.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/service/FalconPathFilter.java b/oozie/src/main/java/org/apache/falcon/service/FalconPathFilter.java
index 03bcea6..51bbfd4 100644
--- a/oozie/src/main/java/org/apache/falcon/service/FalconPathFilter.java
+++ b/oozie/src/main/java/org/apache/falcon/service/FalconPathFilter.java
@@ -21,6 +21,9 @@ package org.apache.falcon.service;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+/**
+ * Path filter for considering jars for inclusion / exclusion while staging lib in oozie.
+ */
public interface FalconPathFilter extends PathFilter {
String getJarName(Path path);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9b4d845f/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
index 841177a..b6c4f25 100644
--- a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
+++ b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
@@ -35,12 +35,15 @@ import org.apache.log4j.Logger;
import java.io.File;
import java.io.IOException;
+/**
+ * Host shared libraries in oozie shared lib dir upon creation or modification of cluster.
+ */
public class SharedLibraryHostingService implements ConfigurationChangeListener {
- private static Logger LOG = Logger.getLogger(SharedLibraryHostingService.class);
+ private static final Logger LOG = Logger.getLogger(SharedLibraryHostingService.class);
private static final String[] LIBS = StartupProperties.get().getProperty("shared.libs").split(",");
- private static final FalconPathFilter nonFalconJarFilter = new FalconPathFilter() {
+ private static final FalconPathFilter NON_FALCON_JAR_FILTER = new FalconPathFilter() {
@Override
public boolean accept(Path path) {
for (String jarName : LIBS) {
@@ -65,14 +68,15 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener
private void addLibsTo(Cluster cluster) throws FalconException {
String libLocation = ClusterHelper.getLocation(cluster, "working") + "/lib";
try {
- pushLibsToHDFS(libLocation, cluster, nonFalconJarFilter);
+ pushLibsToHDFS(libLocation, cluster, NON_FALCON_JAR_FILTER);
} catch (IOException e) {
LOG.error("Failed to copy shared libs to cluster " + cluster.getName(), e);
}
}
public static void pushLibsToHDFS(String path, Cluster cluster, FalconPathFilter pathFilter)
- throws IOException, FalconException {
+ throws IOException, FalconException {
+
String localPaths = StartupProperties.get().getProperty("system.lib.location");
assert localPaths != null && !localPaths.isEmpty() : "Invalid value for system.lib.location";
if (!new File(localPaths).isDirectory()) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9b4d845f/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java b/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
index 20ef6bf..2f53370 100644
--- a/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
+++ b/oozie/src/main/java/org/apache/falcon/util/OozieUtils.java
@@ -23,8 +23,13 @@ import java.io.ByteArrayInputStream;
import java.util.Map;
import java.util.Properties;
+/**
+ * Help methods relating to oozie configuration.
+ */
public final class OozieUtils {
+ private OozieUtils() {}
+
public static Properties toProperties(String properties) {
Configuration conf = new Configuration(false);
conf.addResource(new ByteArrayInputStream(properties.getBytes()));
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9b4d845f/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
index 4c99bfc..392f145 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/FalconPostProcessing.java
@@ -29,9 +29,15 @@ import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
+/**
+ * Utility called by oozie workflow engine post workflow execution in parent workflow.
+ */
public class FalconPostProcessing extends Configured implements Tool {
private static final Logger LOG = Logger.getLogger(FalconPostProcessing.class);
+ /**
+ * Args that the utility understands.
+ */
public enum Arg {
CLUSTER("cluster", "name of the current cluster"),
ENTITY_TYPE("entityType", "type of the entity"),
@@ -170,7 +176,8 @@ public class FalconPostProcessing extends Configured implements Tool {
}
private static CommandLine getCommand(String[] arguments)
- throws ParseException {
+ throws ParseException {
+
Options options = new Options();
addOption(options, Arg.CLUSTER);
addOption(options, Arg.ENTITY_TYPE);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9b4d845f/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
index 8d67a9b..1978c53 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/OozieWorkflowBuilder.java
@@ -35,10 +35,14 @@ import java.util.Date;
import java.util.List;
import java.util.Properties;
+/**
+ * Base workflow builder for falcon entities.
+ * @param <T>
+ */
public abstract class OozieWorkflowBuilder<T extends Entity> extends WorkflowBuilder<T> {
- private static Logger LOG = Logger.getLogger(OozieWorkflowBuilder.class);
- protected static final ConfigurationStore configStore = ConfigurationStore.get();
+ private static final Logger LOG = Logger.getLogger(OozieWorkflowBuilder.class);
+ protected static final ConfigurationStore CONFIG_STORE = ConfigurationStore.get();
protected Properties createAppProperties(String clusterName, Path bundlePath, String user) throws FalconException {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9b4d845f/oozie/src/main/java/org/apache/falcon/workflow/engine/NullBundleJob.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/NullBundleJob.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/NullBundleJob.java
index f2c9ac3..aead7eb 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/NullBundleJob.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/NullBundleJob.java
@@ -24,6 +24,9 @@ import org.apache.oozie.client.CoordinatorJob;
import java.util.Date;
import java.util.List;
+/**
+ * Default Bundle Job.
+ */
public class NullBundleJob implements BundleJob {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9b4d845f/oozie/src/main/java/org/apache/falcon/workflow/engine/NullCoordJob.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/NullCoordJob.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/NullCoordJob.java
index 0d0c816..c93f543 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/NullCoordJob.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/NullCoordJob.java
@@ -24,6 +24,9 @@ import org.apache.oozie.client.CoordinatorJob;
import java.util.Date;
import java.util.List;
+/**
+ * Default coord job.
+ */
public class NullCoordJob implements CoordinatorJob {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9b4d845f/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
index a335418..175b832 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
@@ -29,28 +29,32 @@ import org.apache.oozie.client.OozieClient;
import java.util.concurrent.ConcurrentHashMap;
-//import org.apache.oozie.local.LocalOozie;
-
-public class OozieClientFactory {
+/**
+ * Factory for providing appropriate oozie client.
+ */
+public final class OozieClientFactory {
private static final Logger LOG = Logger.getLogger(OozieClientFactory.class);
- private static final ConcurrentHashMap<String, OozieClient> cache =
- new ConcurrentHashMap<String, OozieClient>();
+ private static final ConcurrentHashMap<String, OozieClient> CACHE =
+ new ConcurrentHashMap<String, OozieClient>();
private static final String LOCAL_OOZIE = "local";
private static volatile boolean localInitialized = false;
- public synchronized static OozieClient get(Cluster cluster)
- throws FalconException {
+ private OozieClientFactory() {}
+
+ public static synchronized OozieClient get(Cluster cluster)
+ throws FalconException {
+
assert cluster != null : "Cluster cant be null";
String oozieUrl = ClusterHelper.getOozieUrl(cluster);
- if (!cache.containsKey(oozieUrl)) {
+ if (!CACHE.containsKey(oozieUrl)) {
OozieClient ref = getClientRef(oozieUrl);
LOG.info("Caching Oozie client object for " + oozieUrl);
- cache.putIfAbsent(oozieUrl, ref);
+ CACHE.putIfAbsent(oozieUrl, ref);
return ref;
} else {
- return cache.get(oozieUrl);
+ return CACHE.get(oozieUrl);
}
}
@@ -59,7 +63,8 @@ public class OozieClientFactory {
}
private static OozieClient getClientRef(String oozieUrl)
- throws FalconException {
+ throws FalconException {
+
if (LOCAL_OOZIE.equals(oozieUrl)) {
return getLocalOozieClient();
} else {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9b4d845f/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
index dd18f9f..7e2f8a4 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
@@ -29,9 +29,12 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
+/**
+ * Service that cleans up artifacts that falcon dropped on hdfs for oozie's use.
+ */
public class OozieHouseKeepingService implements WorkflowEngineActionListener {
- private static Logger LOG = Logger.getLogger(OozieHouseKeepingService.class);
+ private static final Logger LOG = Logger.getLogger(OozieHouseKeepingService.class);
@Override
public void beforeSchedule(Entity entity, String cluster) throws FalconException {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9b4d845f/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index f8da808..a75ad74 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -41,7 +41,7 @@ import java.util.*;
import java.util.Map.Entry;
/**
- * Workflow engine which uses oozies APIs
+ * Workflow engine which uses oozies APIs.
*/
public class OozieWorkflowEngine extends AbstractWorkflowEngine {
@@ -76,7 +76,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
private static final String FALCON_INSTANCE_SOURCE_CLUSTERS = "falcon.instance.source.clusters";
private static final String[] BUNDLE_UPDATEABLE_PROPS = new String[]{
- "parallel", "clusters.clusters[\\d+].validity.end"};
+ "parallel", "clusters.clusters[\\d+].validity.end", };
public OozieWorkflowEngine() {
registerListener(new OozieHouseKeepingService());
@@ -126,39 +126,41 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
private boolean isBundleInState(Entity entity, BundleStatus status)
- throws FalconException {
+ throws FalconException {
+
Map<String, BundleJob> bundles = findLatestBundle(entity);
for (BundleJob bundle : bundles.values()) {
- if (bundle == MISSING) // There is no active bundle
- {
+ if (bundle == MISSING) {// There is no active bundle
return false;
}
switch (status) {
- case ACTIVE:
- if (!BUNDLE_ACTIVE_STATUS.contains(bundle.getStatus())) {
- return false;
- }
- break;
+ case ACTIVE:
+ if (!BUNDLE_ACTIVE_STATUS.contains(bundle.getStatus())) {
+ return false;
+ }
+ break;
- case RUNNING:
- if (!BUNDLE_RUNNING_STATUS.contains(bundle.getStatus())) {
- return false;
- }
- break;
+ case RUNNING:
+ if (!BUNDLE_RUNNING_STATUS.contains(bundle.getStatus())) {
+ return false;
+ }
+ break;
- case SUSPENDED:
- if (!BUNDLE_SUSPENDED_STATUS.contains(bundle.getStatus())) {
- return false;
- }
- break;
+ case SUSPENDED:
+ if (!BUNDLE_SUSPENDED_STATUS.contains(bundle.getStatus())) {
+ return false;
+ }
+ break;
+ default:
}
}
return true;
}
private BundleJob findBundle(Entity entity, String cluster)
- throws FalconException {
+ throws FalconException {
+
String stPath = EntityUtil.getStagingPath(entity);
LOG.info("Staging path for entity " + stPath);
List<BundleJob> bundles = findBundles(entity, cluster);
@@ -171,7 +173,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
private List<BundleJob> findBundles(Entity entity, String cluster)
- throws FalconException {
+ throws FalconException {
+
try {
OozieClient client = OozieClientFactory.get(cluster);
List<BundleJob> jobs = client.getBundleJobsInfo(
@@ -194,7 +197,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
private Map<String, List<BundleJob>> findBundles(Entity entity)
- throws FalconException {
+ throws FalconException {
+
Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
Map<String, List<BundleJob>> jobMap = new HashMap<String, List<BundleJob>>();
for (String cluster : clusters) {
@@ -208,7 +212,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
// In this case, there will not be a bundle with the latest entity md5
// So, pick last created bundle
private Map<String, BundleJob> findLatestBundle(Entity entity)
- throws FalconException {
+ throws FalconException {
+
Map<String, List<BundleJob>> bundlesMap = findBundles(entity);
Map<String, BundleJob> bundleMap = new HashMap<String, BundleJob>();
for (String cluster : bundlesMap.keySet()) {
@@ -271,7 +276,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
private String doBundleAction(Entity entity, BundleAction action, String cluster)
- throws FalconException {
+ throws FalconException {
+
boolean success = true;
List<BundleJob> jobs = findBundles(entity, cluster);
if (jobs.isEmpty()) {
@@ -282,29 +288,30 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
beforeAction(entity, action, cluster);
for (BundleJob job : jobs) {
switch (action) {
- case SUSPEND:
- // not already suspended and preconditions are true
- if (!BUNDLE_SUSPENDED_STATUS.contains(job.getStatus())
- && BUNDLE_SUSPEND_PRECOND.contains(job.getStatus())) {
- suspend(cluster, job.getId());
- success = true;
- }
- break;
-
- case RESUME:
- // not already running and preconditions are true
- if (!BUNDLE_RUNNING_STATUS.contains(job.getStatus())
- && BUNDLE_RESUME_PRECOND.contains(job.getStatus())) {
- resume(cluster, job.getId());
- success = true;
- }
- break;
+ case SUSPEND:
+ // not already suspended and preconditions are true
+ if (!BUNDLE_SUSPENDED_STATUS.contains(job.getStatus())
+ && BUNDLE_SUSPEND_PRECOND.contains(job.getStatus())) {
+ suspend(cluster, job.getId());
+ success = true;
+ }
+ break;
- case KILL:
- // not already killed and preconditions are true
- killBundle(cluster, job);
+ case RESUME:
+ // not already running and preconditions are true
+ if (!BUNDLE_RUNNING_STATUS.contains(job.getStatus())
+ && BUNDLE_RESUME_PRECOND.contains(job.getStatus())) {
+ resume(cluster, job.getId());
success = true;
- break;
+ }
+ break;
+
+ case KILL:
+ // not already killed and preconditions are true
+ killBundle(cluster, job);
+ success = true;
+ break;
+ default:
}
afterAction(entity, action, cluster);
}
@@ -333,46 +340,51 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
private void beforeAction(Entity entity, BundleAction action, String cluster)
- throws FalconException {
+ throws FalconException {
+
for (WorkflowEngineActionListener listener : listeners) {
switch (action) {
- case SUSPEND:
- listener.beforeSuspend(entity, cluster);
- break;
+ case SUSPEND:
+ listener.beforeSuspend(entity, cluster);
+ break;
- case RESUME:
- listener.beforeResume(entity, cluster);
- break;
+ case RESUME:
+ listener.beforeResume(entity, cluster);
+ break;
- case KILL:
- listener.beforeDelete(entity, cluster);
- break;
+ case KILL:
+ listener.beforeDelete(entity, cluster);
+ break;
+ default:
}
}
}
private void afterAction(Entity entity, BundleAction action, String cluster)
- throws FalconException {
+ throws FalconException {
+
for (WorkflowEngineActionListener listener : listeners) {
switch (action) {
- case SUSPEND:
- listener.afterSuspend(entity, cluster);
- break;
+ case SUSPEND:
+ listener.afterSuspend(entity, cluster);
+ break;
- case RESUME:
- listener.afterResume(entity, cluster);
- break;
+ case RESUME:
+ listener.afterResume(entity, cluster);
+ break;
- case KILL:
- listener.afterDelete(entity, cluster);
- break;
+ case KILL:
+ listener.afterDelete(entity, cluster);
+ break;
+ default:
}
}
}
@Override
public InstancesResult getRunningInstances(Entity entity)
- throws FalconException {
+ throws FalconException {
+
try {
WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(
ENGINE, entity);
@@ -447,7 +459,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
@Override
public InstancesResult getStatus(Entity entity, Date start, Date end)
- throws FalconException {
+ throws FalconException {
+
return doJobAction(JobAction.STATUS, entity, start, end, null);
}
@@ -456,7 +469,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
private WorkflowJob getWorkflowInfo(String cluster, String wfId)
- throws FalconException {
+ throws FalconException {
+
OozieClient client = OozieClientFactory.get(cluster);
try {
return client.getJobInfo(wfId);
@@ -546,53 +560,55 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
private String performAction(JobAction action, Properties props,
String cluster, String status, WorkflowJob jobInfo)
- throws FalconException {
- switch (action) {
- case KILL:
- if (!WF_KILL_PRECOND.contains(jobInfo.getStatus())) {
- break;
- }
+ throws FalconException {
- kill(cluster, jobInfo.getId());
- status = Status.KILLED.name();
+ switch (action) {
+ case KILL:
+ if (!WF_KILL_PRECOND.contains(jobInfo.getStatus())) {
break;
+ }
- case SUSPEND:
- if (!WF_SUSPEND_PRECOND.contains(jobInfo.getStatus())) {
- break;
- }
+ kill(cluster, jobInfo.getId());
+ status = Status.KILLED.name();
+ break;
- suspend(cluster, jobInfo.getId());
- status = Status.SUSPENDED.name();
+ case SUSPEND:
+ if (!WF_SUSPEND_PRECOND.contains(jobInfo.getStatus())) {
break;
+ }
- case RESUME:
- if (!WF_RESUME_PRECOND.contains(jobInfo.getStatus())) {
- break;
- }
+ suspend(cluster, jobInfo.getId());
+ status = Status.SUSPENDED.name();
+ break;
- resume(cluster, jobInfo.getId());
- status = Status.RUNNING.name();
+ case RESUME:
+ if (!WF_RESUME_PRECOND.contains(jobInfo.getStatus())) {
break;
+ }
- case RERUN:
- if (!WF_RERUN_PRECOND.contains(jobInfo.getStatus())) {
- break;
- }
+ resume(cluster, jobInfo.getId());
+ status = Status.RUNNING.name();
+ break;
- reRun(cluster, jobInfo.getId(), props);
- status = Status.RUNNING.name();
+ case RERUN:
+ if (!WF_RERUN_PRECOND.contains(jobInfo.getStatus())) {
break;
+ }
- case STATUS:
- break;
+ reRun(cluster, jobInfo.getId(), props);
+ status = Status.RUNNING.name();
+ break;
+
+ case STATUS:
+ break;
+ default:
}
return status;
}
private String getSourceCluster(String cluster,
CoordinatorAction coordinatorAction, Entity entity)
- throws FalconException {
+ throws FalconException {
OozieClient client = OozieClientFactory.get(cluster);
CoordinatorJob coordJob;
@@ -656,8 +672,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
TimeZone tz = EntityUtil.getTimeZone(coord.getTimeZone());
Date iterStart = EntityUtil.getNextStartTime(
coord.getStartTime(), freq, tz, start);
- final Date iterEnd = (coord.getNextMaterializedTime().before(end) ? coord.getNextMaterializedTime() :
- end);
+ Date iterEnd = (coord.getNextMaterializedTime().before(end) ? coord.getNextMaterializedTime() : end);
while (!iterStart.after(iterEnd)) {
int sequence = EntityUtil.getInstanceSequence(
coord.getStartTime(), freq, tz, iterStart);
@@ -690,6 +705,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
.getFalconTimeUnit());
}
+ /**
+ * TimeUnit as understood by Oozie.
+ */
private enum OozieTimeUnit {
MINUTE(TimeUnit.minutes), HOUR(TimeUnit.hours), DAY(TimeUnit.days), WEEK(
null), MONTH(TimeUnit.months), END_OF_DAY(null), END_OF_MONTH(
@@ -712,7 +730,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
private List<CoordinatorJob> getApplicableCoords(Entity entity,
OozieClient client, Date start, Date end, List<BundleJob> bundles)
- throws FalconException {
+ throws FalconException {
+
List<CoordinatorJob> applicableCoords = new ArrayList<CoordinatorJob>();
try {
for (BundleJob bundle : bundles) {
@@ -751,7 +770,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
private boolean canUpdateBundle(Entity oldEntity, Entity newEntity)
- throws FalconException {
+ throws FalconException {
return EntityUtil.equals(oldEntity, newEntity, BUNDLE_UPDATEABLE_PROPS);
}
@@ -894,7 +913,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
private void suspend(String cluster, BundleJob bundle)
- throws FalconException {
+ throws FalconException {
+
bundle = getBundleInfo(cluster, bundle.getId());
for (CoordinatorJob coord : bundle.getCoordinators()) {
suspend(cluster, coord.getId());
@@ -909,7 +929,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
private void updateInternal(Entity oldEntity, Entity newEntity,
String cluster, BundleJob bundle, boolean alreadyCreated)
- throws FalconException {
+ throws FalconException {
+
OozieWorkflowBuilder<Entity> builder = (OozieWorkflowBuilder<Entity>) WorkflowBuilder
.getBuilder(ENGINE, oldEntity);
@@ -949,7 +970,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
private void scheduleForUpdate(Entity entity, String cluster, Date startDate, String user)
- throws FalconException {
+ throws FalconException {
+
WorkflowBuilder<Entity> builder = WorkflowBuilder.getBuilder(ENGINE,
entity);
Properties bundleProps = builder.newWorkflowSchedule(entity, startDate,
@@ -977,7 +999,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
private BundleJob getBundleInfo(String cluster, String bundleId)
- throws FalconException {
+ throws FalconException {
+
OozieClient client = OozieClientFactory.get(cluster);
try {
return client.getBundleJobInfo(bundleId);
@@ -1006,7 +1029,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
@Override
public void reRun(String cluster, String jobId, Properties props)
- throws FalconException {
+ throws FalconException {
+
OozieClient client = OozieClientFactory.get(cluster);
try {
WorkflowJob jobInfo = client.getJobInfo(jobId);
@@ -1030,13 +1054,15 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
private void assertStatus(String cluster, String jobId, Status... statuses)
- throws FalconException {
+ throws FalconException {
+
String actualStatus = getWorkflowStatus(cluster, jobId);
for (int counter = 0; counter < 3; counter++) {
if (!statusEquals(actualStatus, statuses)) {
try {
Thread.sleep(100);
} catch (InterruptedException ignore) {
+ //ignore
}
} else {
return;
@@ -1059,7 +1085,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
@Override
public String getWorkflowStatus(String cluster, String jobId)
- throws FalconException {
+ throws FalconException {
+
OozieClient client = OozieClientFactory.get(cluster);
try {
if (jobId.endsWith("-W")) {
@@ -1141,7 +1168,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
private void change(String cluster, String jobId, String changeValue)
- throws FalconException {
+ throws FalconException {
+
try {
OozieClient client = OozieClientFactory.get(cluster);
client.change(jobId, changeValue);
@@ -1183,13 +1211,12 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
Date intendedPauseTime = (StringUtils.isEmpty(pauseTime) ? null
: SchemaHelper.parseDateUTC(pauseTime));
if (coord.getConcurrency() != concurrency
- || (endTime != null && !coord.getEndTime().equals(
- endTime))
- || (intendedPauseTime != null && !intendedPauseTime
- .equals(coord.getPauseTime()))) {
+ || (endTime != null && !coord.getEndTime().equals(endTime))
+ || (intendedPauseTime != null && !intendedPauseTime.equals(coord.getPauseTime()))) {
try {
Thread.sleep(100);
} catch (InterruptedException ignore) {
+ //ignore
}
} else {
return;
@@ -1223,7 +1250,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
@Override
public InstancesResult getJobDetails(String cluster, String jobId)
- throws FalconException {
+ throws FalconException {
+
OozieClient client = OozieClientFactory.get(cluster);
Instance[] instances = new Instance[1];
Instance instance = new Instance();
@@ -1244,5 +1272,4 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9b4d845f/oozie/src/main/java/org/apache/oozie/client/CustomOozieClient.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/oozie/client/CustomOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/CustomOozieClient.java
index 4998b0d..7634984 100644
--- a/oozie/src/main/java/org/apache/oozie/client/CustomOozieClient.java
+++ b/oozie/src/main/java/org/apache/oozie/client/CustomOozieClient.java
@@ -32,9 +32,12 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+/**
+ * Wrapped Oozie Client.
+ */
public class CustomOozieClient extends OozieClient {
- private static final Map<String, String> none = new HashMap<String, String>();
+ private static final Map<String, String> NONE = new HashMap<String, String>();
public CustomOozieClient(String oozieUrl) {
super(oozieUrl);
@@ -64,7 +67,7 @@ public class CustomOozieClient extends OozieClient {
private class OozieConfiguration extends ClientCallable<Properties> {
public OozieConfiguration(String resource) {
- super("GET", RestConstants.ADMIN, resource, none);
+ super("GET", RestConstants.ADMIN, resource, NONE);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9b4d845f/oozie/src/test/java/org/apache/falcon/oozie/bundle/BundleUnmarshallingTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/bundle/BundleUnmarshallingTest.java b/oozie/src/test/java/org/apache/falcon/oozie/bundle/BundleUnmarshallingTest.java
index 97dca12..8e9d37c 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/bundle/BundleUnmarshallingTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/bundle/BundleUnmarshallingTest.java
@@ -28,6 +28,9 @@ import javax.xml.transform.stream.StreamSource;
import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
+/**
+ * Test to verify if the bundle xml is geting unmarshalled.
+ */
public class BundleUnmarshallingTest {
@Test
public void testValidBundleUnamrashalling() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9b4d845f/oozie/src/test/java/org/apache/falcon/oozie/coordinator/CoordinatorUnmarshallingTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/coordinator/CoordinatorUnmarshallingTest.java b/oozie/src/test/java/org/apache/falcon/oozie/coordinator/CoordinatorUnmarshallingTest.java
index da8b626..f3deac7 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/coordinator/CoordinatorUnmarshallingTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/coordinator/CoordinatorUnmarshallingTest.java
@@ -29,7 +29,7 @@ import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
/**
- * Class to test if generated coordinator.xml is valid
+ * Class to test if generated coordinator.xml is valid.
*/
public class CoordinatorUnmarshallingTest {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9b4d845f/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
index 169c5d3..5ac8006 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
@@ -28,6 +28,9 @@ import org.testng.annotations.Test;
import javax.jms.*;
+/**
+ * Test for validating the falcon post processing utility.
+ */
public class FalconPostProcessingTest {
private String[] args;
@@ -65,7 +68,7 @@ public class FalconPostProcessingTest {
"-" + Arg.WF_ENGINE_URL.getOptionName(),
"http://localhost:11000/oozie/",
"-" + Arg.LOG_DIR.getOptionName(), "target/log",
- "-" + Arg.USER_SUBFLOW_ID.getOptionName(), "userflow@wf-id" + "test"};
+ "-" + Arg.USER_SUBFLOW_ID.getOptionName(), "userflow@wf-id" + "test", };
broker = new BrokerService();
broker.addConnector(BROKER_URL);
broker.setDataDirectory("target/activemq");
@@ -91,7 +94,7 @@ public class FalconPostProcessingTest {
} catch (AssertionError e) {
error = e;
} catch (JMSException ignore) {
-
+ error = null;
}
}
};
@@ -117,7 +120,7 @@ public class FalconPostProcessingTest {
// wait till you get atleast one message
MapMessage m;
- for (m = null; m == null; ) {
+ for (m = null; m == null;) {
m = (MapMessage) consumer.receive();
}
System.out.println("Consumed: " + m.toString());
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9b4d845f/oozie/src/test/java/org/apache/falcon/oozie/workflow/WorkflowUnmarshallingTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/workflow/WorkflowUnmarshallingTest.java b/oozie/src/test/java/org/apache/falcon/oozie/workflow/WorkflowUnmarshallingTest.java
index e17d377..54bba62 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/workflow/WorkflowUnmarshallingTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/workflow/WorkflowUnmarshallingTest.java
@@ -29,6 +29,9 @@ import javax.xml.bind.Unmarshaller;
import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
+/**
+ * Test to verify workflow xml unmarshalling.
+ */
public class WorkflowUnmarshallingTest {
@Test
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/9b4d845f/oozie/src/test/java/org/apache/oozie/client/CustomOozieClientTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/oozie/client/CustomOozieClientTest.java b/oozie/src/test/java/org/apache/oozie/client/CustomOozieClientTest.java
index 897a86d..cf0b385 100644
--- a/oozie/src/test/java/org/apache/oozie/client/CustomOozieClientTest.java
+++ b/oozie/src/test/java/org/apache/oozie/client/CustomOozieClientTest.java
@@ -22,6 +22,9 @@ import org.testng.annotations.Test;
import java.util.Properties;
+/**
+ * Test to verify if the oozie client provided via CustomOozieClient is valid.
+ */
public class CustomOozieClientTest {
@Test(enabled = false)