You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sr...@apache.org on 2013/04/26 17:50:28 UTC
[12/47] Fixes for Checkstyle
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
index ad05a77..a335418 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java
@@ -26,13 +26,14 @@ import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.log4j.Logger;
import org.apache.oozie.client.CustomOozieClient;
import org.apache.oozie.client.OozieClient;
-//import org.apache.oozie.local.LocalOozie;
import java.util.concurrent.ConcurrentHashMap;
+//import org.apache.oozie.local.LocalOozie;
+
public class OozieClientFactory {
- private static final Logger LOG = Logger.getLogger(OozieClientFactory.class);
+ private static final Logger LOG = Logger.getLogger(OozieClientFactory.class);
private static final ConcurrentHashMap<String, OozieClient> cache =
new ConcurrentHashMap<String, OozieClient>();
@@ -56,6 +57,7 @@ public class OozieClientFactory {
public static OozieClient get(String cluster) throws FalconException {
return get((Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, cluster));
}
+
private static OozieClient getClientRef(String oozieUrl)
throws FalconException {
if (LOCAL_OOZIE.equals(oozieUrl)) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
index 8e10353..dd18f9f 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieHouseKeepingService.java
@@ -18,15 +18,15 @@
package org.apache.falcon.workflow.engine;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
public class OozieHouseKeepingService implements WorkflowEngineActionListener {
@@ -49,16 +49,18 @@ public class OozieHouseKeepingService implements WorkflowEngineActionListener {
public void afterDelete(Entity entity, String clusterName) throws FalconException {
try {
Cluster cluster = EntityUtil.getEntity(EntityType.CLUSTER, clusterName);
- Path entityPath = new Path(ClusterHelper.getLocation(cluster, "staging"), EntityUtil.getStagingPath(entity)).getParent();
+ Path entityPath = new Path(ClusterHelper.getLocation(cluster, "staging"),
+ EntityUtil.getStagingPath(entity)).getParent();
LOG.info("Deleting entity path " + entityPath + " on cluster " + clusterName);
-
+
Configuration conf = ClusterHelper.getConfiguration(cluster);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(entityPath) && !fs.delete(entityPath, true)) {
throw new FalconException("Unable to cleanup entity path: " + entityPath);
}
} catch (Exception e) {
- throw new FalconException("Failed to cleanup entity path for " + entity.toShortString() + " on cluster " + clusterName, e);
+ throw new FalconException(
+ "Failed to cleanup entity path for " + entity.toShortString() + " on cluster " + clusterName, e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
index 8f6901a..f8da808 100644
--- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
+++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java
@@ -18,30 +18,12 @@
package org.apache.falcon.workflow.engine;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-import java.util.TimeZone;
-
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.Tag;
import org.apache.falcon.entity.EntityUtil;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.entity.v0.EntityGraph;
-import org.apache.falcon.entity.v0.EntityType;
-import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.*;
import org.apache.falcon.entity.v0.Frequency.TimeUnit;
-import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.resource.InstancesResult.Instance;
@@ -55,9 +37,11 @@ import org.apache.oozie.client.*;
import org.apache.oozie.client.CoordinatorJob.Timeunit;
import org.apache.oozie.client.WorkflowJob.Status;
+import java.util.*;
+import java.util.Map.Entry;
+
/**
* Workflow engine which uses oozies APIs
- *
*/
public class OozieWorkflowEngine extends AbstractWorkflowEngine {
@@ -91,8 +75,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
private static final String FALCON_INSTANCE_ACTION_CLUSTERS = "falcon.instance.action.clusters";
private static final String FALCON_INSTANCE_SOURCE_CLUSTERS = "falcon.instance.source.clusters";
- private static final String[] BUNDLE_UPDATEABLE_PROPS = new String[] {
- "parallel", "clusters.clusters[\\d+].validity.end" };
+ private static final String[] BUNDLE_UPDATEABLE_PROPS = new String[]{
+ "parallel", "clusters.clusters[\\d+].validity.end"};
public OozieWorkflowEngine() {
registerListener(new OozieHouseKeepingService());
@@ -146,22 +130,27 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
Map<String, BundleJob> bundles = findLatestBundle(entity);
for (BundleJob bundle : bundles.values()) {
if (bundle == MISSING) // There is no active bundle
+ {
return false;
+ }
switch (status) {
case ACTIVE:
- if (!BUNDLE_ACTIVE_STATUS.contains(bundle.getStatus()))
+ if (!BUNDLE_ACTIVE_STATUS.contains(bundle.getStatus())) {
return false;
+ }
break;
case RUNNING:
- if (!BUNDLE_RUNNING_STATUS.contains(bundle.getStatus()))
+ if (!BUNDLE_RUNNING_STATUS.contains(bundle.getStatus())) {
return false;
+ }
break;
case SUSPENDED:
- if (!BUNDLE_SUSPENDED_STATUS.contains(bundle.getStatus()))
+ if (!BUNDLE_SUSPENDED_STATUS.contains(bundle.getStatus())) {
return false;
+ }
break;
}
}
@@ -190,11 +179,12 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
+ EntityUtil.getWorkflowName(entity) + ";", 0, 256);
if (jobs != null) {
List<BundleJob> filteredJobs = new ArrayList<BundleJob>();
- for(BundleJob job : jobs)
- if(job.getStatus() != Job.Status.KILLED || job.getEndTime() == null) {
+ for (BundleJob job : jobs) {
+ if (job.getStatus() != Job.Status.KILLED || job.getEndTime() == null) {
filteredJobs.add(job);
LOG.debug("Found bundle " + job.getId());
}
+ }
return filteredJobs;
}
return new ArrayList<BundleJob>();
@@ -224,11 +214,12 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
for (String cluster : bundlesMap.keySet()) {
Date latest = null;
bundleMap.put(cluster, MISSING);
- for (BundleJob job : bundlesMap.get(cluster))
+ for (BundleJob job : bundlesMap.get(cluster)) {
if (latest == null || latest.before(job.getCreatedTime())) {
bundleMap.put(cluster, job);
latest = job.getCreatedTime();
}
+ }
}
return bundleMap;
}
@@ -237,11 +228,12 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
List<BundleJob> bundles = findBundles(entity, cluster);
Date latest = null;
BundleJob bundle = MISSING;
- for (BundleJob job : bundles)
+ for (BundleJob job : bundles) {
if (latest == null || latest.before(job.getCreatedTime())) {
bundle = job;
latest = job.getCreatedTime();
}
+ }
return bundle;
}
@@ -272,8 +264,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
private String doBundleAction(Entity entity, BundleAction action) throws FalconException {
Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity);
String result = null;
- for(String cluster:clusters)
+ for (String cluster : clusters) {
result = doBundleAction(entity, action, cluster);
+ }
return result;
}
@@ -322,7 +315,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
OozieClient client = OozieClientFactory.get(cluster);
try {
//kill all coords
- for(CoordinatorJob coord:job.getCoordinators()) {
+ for (CoordinatorJob coord : job.getCoordinators()) {
client.kill(coord.getId());
LOG.debug("Killed coord " + coord.getId() + " on cluster " + cluster);
}
@@ -389,8 +382,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
List<String> coordNames = new ArrayList<String>();
for (String wfName : wfNames) {
if (EntityUtil.getWorkflowName(Tag.RETENTION, entity)
- .toString().equals(wfName))
+ .toString().equals(wfName)) {
continue;
+ }
coordNames.add(wfName);
}
@@ -400,8 +394,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
if (wfs != null) {
for (WorkflowJob job : wfs) {
WorkflowJob wf = client.getJobInfo(job.getId());
- if (StringUtils.isEmpty(wf.getParentId()))
+ if (StringUtils.isEmpty(wf.getParentId())) {
continue;
+ }
CoordinatorAction action = client.getCoordActionInfo(wf
.getParentId());
@@ -484,8 +479,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
List<Instance> instances = new ArrayList<Instance>();
for (String cluster : actionsMap.keySet()) {
- if (clusterList.size() != 0 && !clusterList.contains(cluster))
+ if (clusterList.size() != 0 && !clusterList.contains(cluster)) {
continue;
+ }
List<CoordinatorAction> actions = actionsMap.get(cluster);
String sourceCluster = null;
@@ -494,8 +490,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
sourceCluster = getSourceCluster(cluster,
coordinatorAction, entity);
if (sourceClusterList.size() != 0
- && !sourceClusterList.contains(sourceCluster))
+ && !sourceClusterList.contains(sourceCluster)) {
continue;
+ }
}
String status = mapActionStatus(coordinatorAction.getStatus());
WorkflowJob jobInfo = null;
@@ -552,32 +549,36 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
throws FalconException {
switch (action) {
case KILL:
- if (!WF_KILL_PRECOND.contains(jobInfo.getStatus()))
+ if (!WF_KILL_PRECOND.contains(jobInfo.getStatus())) {
break;
+ }
kill(cluster, jobInfo.getId());
status = Status.KILLED.name();
break;
case SUSPEND:
- if (!WF_SUSPEND_PRECOND.contains(jobInfo.getStatus()))
+ if (!WF_SUSPEND_PRECOND.contains(jobInfo.getStatus())) {
break;
+ }
suspend(cluster, jobInfo.getId());
status = Status.SUSPENDED.name();
break;
case RESUME:
- if (!WF_RESUME_PRECOND.contains(jobInfo.getStatus()))
+ if (!WF_RESUME_PRECOND.contains(jobInfo.getStatus())) {
break;
+ }
resume(cluster, jobInfo.getId());
status = Status.RUNNING.name();
break;
case RERUN:
- if (!WF_RERUN_PRECOND.contains(jobInfo.getStatus()))
+ if (!WF_RERUN_PRECOND.contains(jobInfo.getStatus())) {
break;
+ }
reRun(cluster, jobInfo.getId(), props);
status = Status.RUNNING.name();
@@ -655,7 +656,8 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
TimeZone tz = EntityUtil.getTimeZone(coord.getTimeZone());
Date iterStart = EntityUtil.getNextStartTime(
coord.getStartTime(), freq, tz, start);
- final Date iterEnd = (coord.getNextMaterializedTime().before(end) ? coord.getNextMaterializedTime() : end);
+ final Date iterEnd = (coord.getNextMaterializedTime().before(end) ? coord.getNextMaterializedTime() :
+ end);
while (!iterStart.after(iterEnd)) {
int sequence = EntityUtil.getInstanceSequence(
coord.getStartTime(), freq, tz, iterStart);
@@ -700,9 +702,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
}
public TimeUnit getFalconTimeUnit() {
- if (falconTimeUnit == null)
+ if (falconTimeUnit == null) {
throw new IllegalStateException("Invalid coord frequency: "
+ name());
+ }
return falconTimeUnit;
}
}
@@ -718,8 +721,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
for (CoordinatorJob coord : coords) {
String coordName = EntityUtil.getWorkflowName(
Tag.RETENTION, entity).toString();
- if (coordName.equals(coord.getAppName()))
+ if (coordName.equals(coord.getAppName())) {
continue;
+ }
// if end time is before coord-start time or start time is
// after coord-end time ignore.
if (!(end.compareTo(coord.getStartTime()) <= 0 || start
@@ -787,16 +791,19 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
//Update affected entities
Set<Entity> affectedEntities = EntityGraph.get().getDependents(oldEntity);
for (Entity affectedEntity : affectedEntities) {
- if (affectedEntity.getEntityType() != EntityType.PROCESS)
+ if (affectedEntity.getEntityType() != EntityType.PROCESS) {
continue;
+ }
LOG.info("Dependent entities need to be updated " + affectedEntity.toShortString());
- if (!UpdateHelper.shouldUpdate(oldEntity, newEntity, affectedEntity))
+ if (!UpdateHelper.shouldUpdate(oldEntity, newEntity, affectedEntity)) {
continue;
+ }
BundleJob affectedProcBundle = findLatestBundle(affectedEntity, cluster);
- if (affectedProcBundle == MISSING)
+ if (affectedProcBundle == MISSING) {
continue;
+ }
LOG.info("Triggering update for " + cluster + ", " + affectedProcBundle.getId());
@@ -843,10 +850,11 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
private void updateCoords(String cluster, String bundleId, int concurrency,
Date endTime) throws FalconException {
- if (endTime.compareTo(now()) <= 0)
+ if (endTime.compareTo(now()) <= 0) {
throw new FalconException("End time "
+ SchemaHelper.formatDateUTC(endTime)
+ " can't be in the past");
+ }
BundleJob bundle = getBundleInfo(cluster, bundleId);
// change coords
@@ -929,9 +937,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
LOG.info("Will set old coord end time to "
+ SchemaHelper.formatDateUTC(endTime));
}
- if (endTime != null)
+ if (endTime != null) {
updateCoords(cluster, bundle.getId(),
EntityUtil.getParallel(oldEntity), endTime);
+ }
if (oldBundleStatus != Job.Status.SUSPENDED
&& oldBundleStatus != Job.Status.PREPSUSPENDED) {
@@ -957,10 +966,13 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
private Date getMinStartTime(BundleJob bundle) {
Date startTime = null;
- if (bundle.getCoordinators() != null)
- for (CoordinatorJob coord : bundle.getCoordinators())
- if (startTime == null || startTime.after(coord.getStartTime()))
+ if (bundle.getCoordinators() != null) {
+ for (CoordinatorJob coord : bundle.getCoordinators()) {
+ if (startTime == null || startTime.after(coord.getStartTime())) {
startTime = coord.getStartTime();
+ }
+ }
+ }
return startTime;
}
@@ -979,9 +991,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
StringBuilder filter = new StringBuilder();
filter.append(OozieClient.FILTER_STATUS).append('=')
.append(Job.Status.RUNNING.name());
- for (String wfName : wfNames)
+ for (String wfName : wfNames) {
filter.append(';').append(OozieClient.FILTER_NAME).append('=')
.append(wfName);
+ }
OozieClient client = OozieClientFactory.get(cluster);
try {
@@ -998,12 +1011,13 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
try {
WorkflowJob jobInfo = client.getJobInfo(jobId);
Properties jobprops = OozieUtils.toProperties(jobInfo.getConf());
- if (props == null || props.isEmpty())
+ if (props == null || props.isEmpty()) {
jobprops.put(OozieClient.RERUN_FAIL_NODES, "false");
- else
+ } else {
for (Entry<Object, Object> entry : props.entrySet()) {
jobprops.put(entry.getKey(), entry.getValue());
}
+ }
jobprops.remove(OozieClient.COORDINATOR_APP_PATH);
jobprops.remove(OozieClient.BUNDLE_APP_PATH);
client.reRun(jobId, jobprops);
@@ -1036,8 +1050,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
private boolean statusEquals(String left, Status... right) {
for (Status rightElement : right) {
- if (left.equals(rightElement.name()))
+ if (left.equals(rightElement.name())) {
return true;
+ }
}
return false;
}
@@ -1047,13 +1062,13 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
throws FalconException {
OozieClient client = OozieClientFactory.get(cluster);
try {
- if(jobId.endsWith("-W")) {
+ if (jobId.endsWith("-W")) {
WorkflowJob jobInfo = client.getJobInfo(jobId);
return jobInfo.getStatus().name();
- } else if(jobId.endsWith("-C")) {
+ } else if (jobId.endsWith("-C")) {
CoordinatorJob coord = client.getCoordJobInfo(jobId);
return coord.getStatus().name();
- } else if(jobId.endsWith("-B")) {
+ } else if (jobId.endsWith("-B")) {
BundleJob bundle = client.getBundleJobInfo(jobId);
return bundle.getStatus().name();
}
@@ -1147,14 +1162,16 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine {
changeValue.append(OozieClient.CHANGE_VALUE_ENDTIME).append("=")
.append(endTimeStr).append(";");
}
- if (pauseTime != null)
+ if (pauseTime != null) {
changeValue.append(OozieClient.CHANGE_VALUE_PAUSETIME).append("=")
.append(pauseTime);
+ }
String changeValueStr = changeValue.toString();
- if (changeValue.toString().endsWith(";"))
+ if (changeValue.toString().endsWith(";")) {
changeValueStr = changeValue.substring(0,
changeValueStr.length() - 1);
+ }
change(cluster, id, changeValueStr);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/oozie/src/main/java/org/apache/oozie/client/CustomOozieClient.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/oozie/client/CustomOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/CustomOozieClient.java
index 5895d51..4998b0d 100644
--- a/oozie/src/main/java/org/apache/oozie/client/CustomOozieClient.java
+++ b/oozie/src/main/java/org/apache/oozie/client/CustomOozieClient.java
@@ -51,16 +51,16 @@ public class CustomOozieClient extends OozieClient {
@Override
protected HttpURLConnection createConnection(URL url, String method) throws IOException, OozieClientException {
HttpURLConnection conn = super.createConnection(url, method);
-
+
int connectTimeout = Integer.valueOf(RuntimeProperties.get().getProperty("oozie.connect.timeout", "1000"));
conn.setConnectTimeout(connectTimeout);
int readTimeout = Integer.valueOf(RuntimeProperties.get().getProperty("oozie.read.timeout", "45000"));
conn.setReadTimeout(readTimeout);
-
+
return conn;
}
-
+
private class OozieConfiguration extends ClientCallable<Properties> {
public OozieConfiguration(String resource) {
@@ -78,8 +78,7 @@ public class CustomOozieClient extends OozieClient {
props.put(key, json.get(key));
}
return props;
- }
- else {
+ } else {
handleError(conn);
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/oozie/src/main/resources/oozie-bundle-0.1.xsd
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/oozie-bundle-0.1.xsd b/oozie/src/main/resources/oozie-bundle-0.1.xsd
index fbcd41c..41534b3 100644
--- a/oozie/src/main/resources/oozie-bundle-0.1.xsd
+++ b/oozie/src/main/resources/oozie-bundle-0.1.xsd
@@ -38,7 +38,7 @@
</xs:sequence>
</xs:complexType>
<xs:complexType name="COORDINATOR">
- <xs:sequence minOccurs="1" maxOccurs="1">
+ <xs:sequence minOccurs="1" maxOccurs="1">
<xs:element name="app-path" type="xs:string" minOccurs="1" maxOccurs="1"/>
<xs:element name="configuration" type="bundle:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
</xs:sequence>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/oozie/src/main/resources/oozie-workflow-0.3.xsd
----------------------------------------------------------------------
diff --git a/oozie/src/main/resources/oozie-workflow-0.3.xsd b/oozie/src/main/resources/oozie-workflow-0.3.xsd
index 06d2013..8cda3d9 100644
--- a/oozie/src/main/resources/oozie-workflow-0.3.xsd
+++ b/oozie/src/main/resources/oozie-workflow-0.3.xsd
@@ -126,7 +126,7 @@
<xs:element name="sub-workflow" type="workflow:SUB-WORKFLOW" minOccurs="1" maxOccurs="1"/>
<xs:element name="fs" type="workflow:FS" minOccurs="1" maxOccurs="1"/>
<xs:element name="java" type="workflow:JAVA" minOccurs="1" maxOccurs="1"/>
- <!-- <xs:any namespace="##other" minOccurs="1" maxOccurs="1"/>-->
+ <!-- <xs:any namespace="##other" minOccurs="1" maxOccurs="1"/>-->
</xs:choice>
<xs:element name="ok" type="workflow:ACTION_TRANSITION" minOccurs="1" maxOccurs="1"/>
<xs:element name="error" type="workflow:ACTION_TRANSITION" minOccurs="1" maxOccurs="1"/>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/oozie/src/test/java/org/apache/falcon/oozie/bundle/BundleUnmarshallingTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/bundle/BundleUnmarshallingTest.java b/oozie/src/test/java/org/apache/falcon/oozie/bundle/BundleUnmarshallingTest.java
index 6c6a5e5..97dca12 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/bundle/BundleUnmarshallingTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/bundle/BundleUnmarshallingTest.java
@@ -18,6 +18,9 @@
package org.apache.falcon.oozie.bundle;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.Unmarshaller;
@@ -25,17 +28,16 @@ import javax.xml.transform.stream.StreamSource;
import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
public class BundleUnmarshallingTest {
@Test
public void testValidBundleUnamrashalling() throws Exception {
- Unmarshaller unmarshaller = JAXBContext.newInstance(org.apache.falcon.oozie.bundle.BUNDLEAPP.class).createUnmarshaller();
+ Unmarshaller unmarshaller = JAXBContext.newInstance(
+ org.apache.falcon.oozie.bundle.BUNDLEAPP.class).createUnmarshaller();
SchemaFactory schemaFactory = SchemaFactory.newInstance("http://www.w3.org/2001/XMLSchema");
Schema schema = schemaFactory.newSchema(this.getClass().getResource("/oozie-bundle-0.1.xsd"));
unmarshaller.setSchema(schema);
- Object bundle = unmarshaller.unmarshal(new StreamSource(BundleUnmarshallingTest.class.getResourceAsStream("/oozie/xmls/bundle.xml")),
+ Object bundle = unmarshaller.unmarshal(
+ new StreamSource(BundleUnmarshallingTest.class.getResourceAsStream("/oozie/xmls/bundle.xml")),
BUNDLEAPP.class);
BUNDLEAPP bundleApp = ((JAXBElement<BUNDLEAPP>) bundle).getValue();
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/oozie/src/test/java/org/apache/falcon/oozie/coordinator/CoordinatorUnmarshallingTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/coordinator/CoordinatorUnmarshallingTest.java b/oozie/src/test/java/org/apache/falcon/oozie/coordinator/CoordinatorUnmarshallingTest.java
index b7b4440..da8b626 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/coordinator/CoordinatorUnmarshallingTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/coordinator/CoordinatorUnmarshallingTest.java
@@ -17,6 +17,10 @@
*/
package org.apache.falcon.oozie.coordinator;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import org.xml.sax.SAXException;
+
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
@@ -24,14 +28,8 @@ import javax.xml.bind.Unmarshaller;
import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-import org.xml.sax.SAXException;
-
/**
- *
* Class to test if generated coordinator.xml is valid
- *
*/
public class CoordinatorUnmarshallingTest {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
index b2cf821..169c5d3 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/workflow/FalconPostProcessingTest.java
@@ -17,14 +17,6 @@
*/
package org.apache.falcon.oozie.workflow;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.falcon.workflow.FalconPostProcessing;
@@ -34,129 +26,132 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import javax.jms.*;
+
public class FalconPostProcessingTest {
- private String[] args;
- private static final String BROKER_URL = "vm://localhost?broker.useJmx=false&broker.persistent=true";
- // private static final String BROKER_URL =
- // "tcp://localhost:61616?daemon=true";
- private static final String BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
- private static final String FALCON_TOPIC_NAME = "FALCON.ENTITY.TOPIC";
- private static final String ENTITY_NAME = "agg-coord";
- private BrokerService broker;
-
- private volatile AssertionError error;
-
- @BeforeClass
- public void setup() throws Exception {
- args = new String[] { "-" + Arg.ENTITY_NAME.getOptionName(),
- ENTITY_NAME, "-" + Arg.FEED_NAMES.getOptionName(),
- "click-logs,raw-logs",
- "-" + Arg.FEED_INSTANCE_PATHS.getOptionName(),
- "/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20",
- "-" + Arg.WORKFLOW_ID.getOptionName(), "workflow-01-00",
- "-" + Arg.RUN_ID.getOptionName(), "1",
- "-" + Arg.NOMINAL_TIME.getOptionName(), "2011-01-01-01-00",
- "-" + Arg.TIMESTAMP.getOptionName(), "2012-01-01-01-00",
- "-" + Arg.BRKR_URL.getOptionName(), BROKER_URL,
- "-" + Arg.BRKR_IMPL_CLASS.getOptionName(), (BROKER_IMPL_CLASS),
- "-" + Arg.USER_BRKR_URL.getOptionName(), BROKER_URL,
- "-" + Arg.USER_BRKR_IMPL_CLASS.getOptionName(),
- (BROKER_IMPL_CLASS), "-" + Arg.ENTITY_TYPE.getOptionName(),
- ("process"), "-" + Arg.OPERATION.getOptionName(), ("GENERATE"),
- "-" + Arg.LOG_FILE.getOptionName(), ("/logFile"),
- "-" + Arg.STATUS.getOptionName(), ("SUCCEEDED"),
- "-" + Arg.BRKR_TTL.getOptionName(), "10",
- "-" + Arg.CLUSTER.getOptionName(), "corp",
- "-" + Arg.WF_ENGINE_URL.getOptionName(),
- "http://localhost:11000/oozie/",
- "-" + Arg.LOG_DIR.getOptionName(), "target/log",
- "-" + Arg.USER_SUBFLOW_ID.getOptionName(), "userflow@wf-id"+"test" };
- broker = new BrokerService();
- broker.addConnector(BROKER_URL);
- broker.setDataDirectory("target/activemq");
- broker.setBrokerName("localhost");
- broker.start();
- }
-
- @AfterClass
- public void tearDown() throws Exception {
- broker.deleteAllMessages();
- broker.stop();
- }
-
- @Test
- public void testProcessMessageCreator() throws Exception {
-
- Thread t = new Thread() {
- @Override
- public void run() {
- try {
- consumer(BROKER_URL, "FALCON." + ENTITY_NAME);
- consumer(BROKER_URL, FALCON_TOPIC_NAME);
- } catch (AssertionError e) {
- error = e;
- } catch (JMSException ignore) {
-
- }
- }
- };
- t.start();
- Thread.sleep(1500);
- new FalconPostProcessing().run(this.args);
- t.join();
- if (error != null) {
- throw error;
- }
- }
-
- private void consumer(String brokerUrl, String topic) throws JMSException {
- ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
- brokerUrl);
- Connection connection = connectionFactory.createConnection();
- connection.start();
-
- Session session = connection.createSession(false,
- Session.AUTO_ACKNOWLEDGE);
- Destination destination = session.createTopic(topic);
- MessageConsumer consumer = session.createConsumer(destination);
-
- // wait till you get atleast one message
- MapMessage m;
- for (m = null; m == null;)
- m = (MapMessage) consumer.receive();
- System.out.println("Consumed: " + m.toString());
-
- assertMessage(m);
- if (topic.equals(FALCON_TOPIC_NAME)) {
- Assert.assertEquals(m.getString(Arg.FEED_NAMES.getOptionName()),
- "click-logs,raw-logs");
- Assert.assertEquals(
- m.getString(Arg.FEED_INSTANCE_PATHS.getOptionName()),
- "/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20");
- } else {
- Assert.assertEquals(m.getString(Arg.FEED_NAMES.getOptionName()),
- "click-logs");
- Assert.assertEquals(
- m.getString(Arg.FEED_INSTANCE_PATHS.getOptionName()),
- "/click-logs/10/05/05/00/20");
-
- }
-
- connection.close();
- }
-
- private void assertMessage(MapMessage m) throws JMSException {
- Assert.assertEquals(m.getString(Arg.ENTITY_NAME.getOptionName()),
- "agg-coord");
- Assert.assertEquals(m.getString(Arg.WORKFLOW_ID.getOptionName()),
- "workflow-01-00");
- Assert.assertEquals(m.getString(Arg.RUN_ID.getOptionName()), "1");
- Assert.assertEquals(m.getString(Arg.NOMINAL_TIME.getOptionName()),
- "2011-01-01T01:00Z");
- Assert.assertEquals(m.getString(Arg.TIMESTAMP.getOptionName()),
- "2012-01-01T01:00Z");
- Assert.assertEquals(m.getString(Arg.STATUS.getOptionName()),
- "SUCCEEDED");
- }
+ private String[] args;
+ private static final String BROKER_URL = "vm://localhost?broker.useJmx=false&broker.persistent=true";
+ // private static final String BROKER_URL =
+ // "tcp://localhost:61616?daemon=true";
+ private static final String BROKER_IMPL_CLASS = "org.apache.activemq.ActiveMQConnectionFactory";
+ private static final String FALCON_TOPIC_NAME = "FALCON.ENTITY.TOPIC";
+ private static final String ENTITY_NAME = "agg-coord";
+ private BrokerService broker;
+
+ private volatile AssertionError error;
+
+ @BeforeClass
+ public void setup() throws Exception {
+ args = new String[]{"-" + Arg.ENTITY_NAME.getOptionName(),
+ ENTITY_NAME, "-" + Arg.FEED_NAMES.getOptionName(),
+ "click-logs,raw-logs",
+ "-" + Arg.FEED_INSTANCE_PATHS.getOptionName(),
+ "/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20",
+ "-" + Arg.WORKFLOW_ID.getOptionName(), "workflow-01-00",
+ "-" + Arg.RUN_ID.getOptionName(), "1",
+ "-" + Arg.NOMINAL_TIME.getOptionName(), "2011-01-01-01-00",
+ "-" + Arg.TIMESTAMP.getOptionName(), "2012-01-01-01-00",
+ "-" + Arg.BRKR_URL.getOptionName(), BROKER_URL,
+ "-" + Arg.BRKR_IMPL_CLASS.getOptionName(), (BROKER_IMPL_CLASS),
+ "-" + Arg.USER_BRKR_URL.getOptionName(), BROKER_URL,
+ "-" + Arg.USER_BRKR_IMPL_CLASS.getOptionName(),
+ (BROKER_IMPL_CLASS), "-" + Arg.ENTITY_TYPE.getOptionName(),
+ ("process"), "-" + Arg.OPERATION.getOptionName(), ("GENERATE"),
+ "-" + Arg.LOG_FILE.getOptionName(), ("/logFile"),
+ "-" + Arg.STATUS.getOptionName(), ("SUCCEEDED"),
+ "-" + Arg.BRKR_TTL.getOptionName(), "10",
+ "-" + Arg.CLUSTER.getOptionName(), "corp",
+ "-" + Arg.WF_ENGINE_URL.getOptionName(),
+ "http://localhost:11000/oozie/",
+ "-" + Arg.LOG_DIR.getOptionName(), "target/log",
+ "-" + Arg.USER_SUBFLOW_ID.getOptionName(), "userflow@wf-id" + "test"};
+ broker = new BrokerService();
+ broker.addConnector(BROKER_URL);
+ broker.setDataDirectory("target/activemq");
+ broker.setBrokerName("localhost");
+ broker.start();
+ }
+
+ @AfterClass
+ public void tearDown() throws Exception {
+ broker.deleteAllMessages();
+ broker.stop();
+ }
+
+ @Test
+ public void testProcessMessageCreator() throws Exception {
+
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ try {
+ consumer(BROKER_URL, "FALCON." + ENTITY_NAME);
+ consumer(BROKER_URL, FALCON_TOPIC_NAME);
+ } catch (AssertionError e) {
+ error = e;
+ } catch (JMSException ignore) {
+
+ }
+ }
+ };
+ t.start();
+ Thread.sleep(1500);
+ new FalconPostProcessing().run(this.args);
+ t.join();
+ if (error != null) {
+ throw error;
+ }
+ }
+
+ private void consumer(String brokerUrl, String topic) throws JMSException {
+ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
+ brokerUrl);
+ Connection connection = connectionFactory.createConnection();
+ connection.start();
+
+ Session session = connection.createSession(false,
+ Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createTopic(topic);
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ // wait till you get atleast one message
+ MapMessage m;
+ for (m = null; m == null; ) {
+ m = (MapMessage) consumer.receive();
+ }
+ System.out.println("Consumed: " + m.toString());
+
+ assertMessage(m);
+ if (topic.equals(FALCON_TOPIC_NAME)) {
+ Assert.assertEquals(m.getString(Arg.FEED_NAMES.getOptionName()),
+ "click-logs,raw-logs");
+ Assert.assertEquals(
+ m.getString(Arg.FEED_INSTANCE_PATHS.getOptionName()),
+ "/click-logs/10/05/05/00/20,/raw-logs/10/05/05/00/20");
+ } else {
+ Assert.assertEquals(m.getString(Arg.FEED_NAMES.getOptionName()),
+ "click-logs");
+ Assert.assertEquals(
+ m.getString(Arg.FEED_INSTANCE_PATHS.getOptionName()),
+ "/click-logs/10/05/05/00/20");
+
+ }
+
+ connection.close();
+ }
+
+ private void assertMessage(MapMessage m) throws JMSException {
+ Assert.assertEquals(m.getString(Arg.ENTITY_NAME.getOptionName()),
+ "agg-coord");
+ Assert.assertEquals(m.getString(Arg.WORKFLOW_ID.getOptionName()),
+ "workflow-01-00");
+ Assert.assertEquals(m.getString(Arg.RUN_ID.getOptionName()), "1");
+ Assert.assertEquals(m.getString(Arg.NOMINAL_TIME.getOptionName()),
+ "2011-01-01T01:00Z");
+ Assert.assertEquals(m.getString(Arg.TIMESTAMP.getOptionName()),
+ "2012-01-01T01:00Z");
+ Assert.assertEquals(m.getString(Arg.STATUS.getOptionName()),
+ "SUCCEEDED");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/oozie/src/test/java/org/apache/falcon/oozie/workflow/WorkflowUnmarshallingTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/falcon/oozie/workflow/WorkflowUnmarshallingTest.java b/oozie/src/test/java/org/apache/falcon/oozie/workflow/WorkflowUnmarshallingTest.java
index e86e177..e17d377 100644
--- a/oozie/src/test/java/org/apache/falcon/oozie/workflow/WorkflowUnmarshallingTest.java
+++ b/oozie/src/test/java/org/apache/falcon/oozie/workflow/WorkflowUnmarshallingTest.java
@@ -18,6 +18,10 @@
package org.apache.falcon.oozie.workflow;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import org.xml.sax.SAXException;
+
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
@@ -25,20 +29,18 @@ import javax.xml.bind.Unmarshaller;
import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-import org.xml.sax.SAXException;
-
public class WorkflowUnmarshallingTest {
@Test
public void testValidWorkflowUnamrashalling() throws JAXBException, SAXException {
- Unmarshaller unmarshaller = JAXBContext.newInstance(org.apache.falcon.oozie.workflow.WORKFLOWAPP.class).createUnmarshaller();
+ Unmarshaller unmarshaller = JAXBContext.newInstance(
+ org.apache.falcon.oozie.workflow.WORKFLOWAPP.class).createUnmarshaller();
SchemaFactory schemaFactory = SchemaFactory.newInstance("http://www.w3.org/2001/XMLSchema");
Schema schema = schemaFactory.newSchema(this.getClass().getResource("/oozie-workflow-0.3.xsd"));
unmarshaller.setSchema(schema);
- JAXBElement<WORKFLOWAPP> workflowApp = (JAXBElement<WORKFLOWAPP>) unmarshaller.unmarshal(WorkflowUnmarshallingTest.class
- .getResourceAsStream("/oozie/xmls/workflow.xml"));
+ JAXBElement<WORKFLOWAPP> workflowApp = (JAXBElement<WORKFLOWAPP>) unmarshaller.unmarshal(
+ WorkflowUnmarshallingTest.class
+ .getResourceAsStream("/oozie/xmls/workflow.xml"));
WORKFLOWAPP app = workflowApp.getValue();
Assert.assertEquals(app.getName(), "java-main-wf");
Assert.assertEquals(((ACTION) app.getDecisionOrForkOrJoin().get(0)).getName(), "java-node");
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/oozie/src/test/java/org/apache/oozie/client/CustomOozieClientTest.java
----------------------------------------------------------------------
diff --git a/oozie/src/test/java/org/apache/oozie/client/CustomOozieClientTest.java b/oozie/src/test/java/org/apache/oozie/client/CustomOozieClientTest.java
index 327d95c..897a86d 100644
--- a/oozie/src/test/java/org/apache/oozie/client/CustomOozieClientTest.java
+++ b/oozie/src/test/java/org/apache/oozie/client/CustomOozieClientTest.java
@@ -24,7 +24,7 @@ import java.util.Properties;
public class CustomOozieClientTest {
- @Test (enabled = false)
+ @Test(enabled = false)
public void testGetConfiguration() throws Exception {
CustomOozieClient client = new CustomOozieClient("http://localhost:11000/oozie");
Properties props = client.getConfiguration();
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/oozie/src/test/resources/oozie/xmls/bundle.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/oozie/xmls/bundle.xml b/oozie/src/test/resources/oozie/xmls/bundle.xml
index 3d75e3d..18f6215 100644
--- a/oozie/src/test/resources/oozie/xmls/bundle.xml
+++ b/oozie/src/test/resources/oozie/xmls/bundle.xml
@@ -15,18 +15,18 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
-<bundle-app name='bundle-app' xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' xmlns='uri:oozie:bundle:0.1'>
- <coordinator name='coord-1'>
- <app-path>${nameNode}/user/${userName}/${examplesRoot}/apps/aggregator/coordinator.xml</app-path>
- <configuration>
- <property>
- <name>start</name>
- <value>${start}</value>
- </property>
- <property>
- <name>end</name>
- <value>${end}</value>
- </property>
- </configuration>
- </coordinator>
+<bundle-app name='bundle-app' xmlns='uri:oozie:bundle:0.1'>
+ <coordinator name='coord-1'>
+ <app-path>${nameNode}/user/${userName}/${examplesRoot}/apps/aggregator/coordinator.xml</app-path>
+ <configuration>
+ <property>
+ <name>start</name>
+ <value>${start}</value>
+ </property>
+ <property>
+ <name>end</name>
+ <value>${end}</value>
+ </property>
+ </configuration>
+ </coordinator>
</bundle-app>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/oozie/src/test/resources/oozie/xmls/coordinator.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/oozie/xmls/coordinator.xml b/oozie/src/test/resources/oozie/xmls/coordinator.xml
index f975432..64829c8 100644
--- a/oozie/src/test/resources/oozie/xmls/coordinator.xml
+++ b/oozie/src/test/resources/oozie/xmls/coordinator.xml
@@ -17,40 +17,42 @@
limitations under the License.
-->
-<coordinator-app end="" frequency="" name="test" start="" timezone="" xmlns="uri:oozie:coordinator:0.3" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="uri:oozie:coordinator:0.2 ../../main/resources/coordinator.xsd ">
- <controls>
- <timeout>timeout</timeout>
- <concurrency>concurrency</concurrency>
- <execution>execution</execution>
- <throttle>throttle</throttle>
- </controls>
- <datasets>
- <include>include</include>
- <dataset frequency="" initial-instance="" name="dataset1" timezone="">
- <uri-template>uri-template</uri-template>
- <done-flag>done-flag</done-flag>
- </dataset>
- </datasets>
- <input-events>
- <data-in dataset="dataset1" name="data-in">
- <instance>instance</instance>
- </data-in>
- </input-events>
- <output-events>
- <data-out dataset="dataset1" name="data-out1">
- <instance>instance</instance>
- </data-out>
- </output-events>
- <action>
- <workflow>
- <app-path>app-path</app-path>
- <configuration>
- <property>
- <name>name</name>
- <value>value</value>
- <description>description</description>
- </property>
- </configuration>
- </workflow>
- </action>
+<coordinator-app end="" frequency="" name="test" start="" timezone="" xmlns="uri:oozie:coordinator:0.3"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="/../main/resources/coordinator.xsd">
+ <controls>
+ <timeout>timeout</timeout>
+ <concurrency>concurrency</concurrency>
+ <execution>execution</execution>
+ <throttle>throttle</throttle>
+ </controls>
+ <datasets>
+ <include>include</include>
+ <dataset frequency="" initial-instance="" name="dataset1" timezone="">
+ <uri-template>uri-template</uri-template>
+ <done-flag>done-flag</done-flag>
+ </dataset>
+ </datasets>
+ <input-events>
+ <data-in dataset="dataset1" name="data-in">
+ <instance>instance</instance>
+ </data-in>
+ </input-events>
+ <output-events>
+ <data-out dataset="dataset1" name="data-out1">
+ <instance>instance</instance>
+ </data-out>
+ </output-events>
+ <action>
+ <workflow>
+ <app-path>app-path</app-path>
+ <configuration>
+ <property>
+ <name>name</name>
+ <value>value</value>
+ <description>description</description>
+ </property>
+ </configuration>
+ </workflow>
+ </action>
</coordinator-app>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/oozie/src/test/resources/oozie/xmls/workflow.xml
----------------------------------------------------------------------
diff --git a/oozie/src/test/resources/oozie/xmls/workflow.xml b/oozie/src/test/resources/oozie/xmls/workflow.xml
index 1ef7bf5..5d14413 100644
--- a/oozie/src/test/resources/oozie/xmls/workflow.xml
+++ b/oozie/src/test/resources/oozie/xmls/workflow.xml
@@ -25,7 +25,7 @@
<main-class>org.apache.falcon.messaging.MessageProducer</main-class>
<arg>${wf:name()}</arg>
<arg>${wf:appPath()}</arg>
- <arg>${timestamp()}</arg>
+ <arg>${timestamp()}</arg>
</java>
<ok to="end"/>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/prism/src/main/java/org/apache/falcon/FalconWebException.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/FalconWebException.java b/prism/src/main/java/org/apache/falcon/FalconWebException.java
index 50e6772..c0d41ca 100644
--- a/prism/src/main/java/org/apache/falcon/FalconWebException.java
+++ b/prism/src/main/java/org/apache/falcon/FalconWebException.java
@@ -32,7 +32,7 @@ public class FalconWebException extends WebApplicationException {
private static final Logger LOG = Logger.getLogger(FalconWebException.class);
public static FalconWebException newException(Throwable e,
- Response.Status status) {
+ Response.Status status) {
LOG.error("Failure reason", e);
return newException(e.getMessage() + "\n" + getAddnInfo(e), status);
}
@@ -44,14 +44,14 @@ public class FalconWebException extends WebApplicationException {
public static FalconWebException newException(APIResult result,
- Response.Status status) {
+ Response.Status status) {
LOG.error("Action failed: " + status + "\nError:" + result.getMessage());
return new FalconWebException(Response.status(status).
entity(result).type(MediaType.TEXT_XML_TYPE).build());
}
public static FalconWebException newException(String message,
- Response.Status status) {
+ Response.Status status) {
LOG.error("Action failed: " + status + "\nError:" + message);
APIResult result = new APIResult(APIResult.Status.FAILED, message);
return new FalconWebException(Response.status(status).
@@ -65,8 +65,9 @@ public class FalconWebException extends WebApplicationException {
}
private static String getMessage(Throwable e) {
- if(StringUtils.isEmpty(e.getMessage()))
+ if (StringUtils.isEmpty(e.getMessage())) {
return e.getClass().getName();
+ }
return e.getMessage();
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/prism/src/main/java/org/apache/falcon/listener/ContextStartupListener.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/listener/ContextStartupListener.java b/prism/src/main/java/org/apache/falcon/listener/ContextStartupListener.java
index 3b75caa..54a9b4a 100644
--- a/prism/src/main/java/org/apache/falcon/listener/ContextStartupListener.java
+++ b/prism/src/main/java/org/apache/falcon/listener/ContextStartupListener.java
@@ -18,12 +18,6 @@
package org.apache.falcon.listener;
-import java.util.Map;
-import java.util.Properties;
-
-import javax.servlet.ServletContextEvent;
-import javax.servlet.ServletContextListener;
-
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.service.ServiceInitializer;
@@ -32,6 +26,11 @@ import org.apache.falcon.util.RuntimeProperties;
import org.apache.falcon.util.StartupProperties;
import org.apache.log4j.Logger;
+import javax.servlet.ServletContextEvent;
+import javax.servlet.ServletContextListener;
+import java.util.Map;
+import java.util.Properties;
+
public class ContextStartupListener implements ServletContextListener {
private static Logger LOG = Logger.getLogger(ContextStartupListener.class);
@@ -48,7 +47,7 @@ public class ContextStartupListener implements ServletContextListener {
LOG.info("Initializing runtime properties ...");
RuntimeProperties.get();
-
+
try {
startupServices.initialize();
ConfigurationStore.get();
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/prism/src/main/java/org/apache/falcon/plugin/ChainableMonitoringPlugin.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/plugin/ChainableMonitoringPlugin.java b/prism/src/main/java/org/apache/falcon/plugin/ChainableMonitoringPlugin.java
index c98bf53..2b99e70 100644
--- a/prism/src/main/java/org/apache/falcon/plugin/ChainableMonitoringPlugin.java
+++ b/prism/src/main/java/org/apache/falcon/plugin/ChainableMonitoringPlugin.java
@@ -46,7 +46,7 @@ public class ChainableMonitoringPlugin extends AbstractFalconAspect implements M
LOG.info("Registered Monitoring Plugin " + pluginClass);
}
} catch (FalconException e) {
- plugins = Arrays.asList((MonitoringPlugin)new LoggingPlugin());
+ plugins = Arrays.asList((MonitoringPlugin) new LoggingPlugin());
LOG.error("Unable to initialize monitoring plugins: " + pluginClasses, e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index ff18864..a272957 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -18,20 +18,8 @@
package org.apache.falcon.resource;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.NoSuchElementException;
-import java.util.Set;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.core.Response;
-
import org.apache.commons.beanutils.PropertyUtils;
import org.apache.commons.lang.ObjectUtils;
-import org.apache.hadoop.io.IOUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconRuntimException;
import org.apache.falcon.FalconWebException;
@@ -54,8 +42,15 @@ import org.apache.falcon.util.DeploymentUtil;
import org.apache.falcon.util.RuntimeProperties;
import org.apache.falcon.workflow.WorkflowEngineFactory;
import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
+import org.apache.hadoop.io.IOUtils;
import org.apache.log4j.Logger;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.*;
+
public abstract class AbstractEntityManager {
private static final Logger LOG = Logger.getLogger(AbstractEntityManager.class);
private static final Logger AUDIT = Logger.getLogger("AUDIT");
@@ -74,14 +69,16 @@ public abstract class AbstractEntityManager {
protected void checkColo(String colo) throws FalconWebException {
if (!DeploymentUtil.getCurrentColo().equals(colo)) {
- throw FalconWebException.newException("Current colo (" + DeploymentUtil.getCurrentColo() + ") is not " + colo,
+ throw FalconWebException.newException(
+ "Current colo (" + DeploymentUtil.getCurrentColo() + ") is not " + colo,
Response.Status.BAD_REQUEST);
}
}
protected Set<String> getAllColos() {
- if (DeploymentUtil.isEmbeddedMode())
+ if (DeploymentUtil.isEmbeddedMode()) {
return DeploymentUtil.getDefaultColos();
+ }
String[] colos = RuntimeProperties.get().getProperty("all.colos", DeploymentUtil.getDefaultColo()).split(",");
return new HashSet<String>(Arrays.asList(colos));
}
@@ -95,28 +92,32 @@ public abstract class AbstractEntityManager {
}
return colos;
}
-
+
protected Set<String> getApplicableColos(String type, String name) throws FalconWebException {
try {
- if (DeploymentUtil.isEmbeddedMode())
+ if (DeploymentUtil.isEmbeddedMode()) {
return DeploymentUtil.getDefaultColos();
+ }
- if (EntityType.valueOf(type.toUpperCase()) == EntityType.CLUSTER)
+ if (EntityType.valueOf(type.toUpperCase()) == EntityType.CLUSTER) {
return getAllColos();
+ }
return getApplicableColos(type, EntityUtil.getEntity(type, name));
} catch (FalconException e) {
throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
}
}
-
+
protected Set<String> getApplicableColos(String type, Entity entity) throws FalconWebException {
try {
- if (DeploymentUtil.isEmbeddedMode())
+ if (DeploymentUtil.isEmbeddedMode()) {
return DeploymentUtil.getDefaultColos();
+ }
- if (EntityType.valueOf(type.toUpperCase()) == EntityType.CLUSTER)
+ if (EntityType.valueOf(type.toUpperCase()) == EntityType.CLUSTER) {
return getAllColos();
+ }
Set<String> clusters = EntityUtil.getClustersDefined(entity);
Set<String> colos = new HashSet<String>();
@@ -134,16 +135,13 @@ public abstract class AbstractEntityManager {
* Submit a new entity. Entities can be of type feed, process or data end
* points. Entity definitions are validated structurally against schema and
* subsequently for other rules before they are admitted into the system
- *
+ * <p/>
* Entity name acts as the key and an entity once added, can't be added
* again unless deleted.
- *
- * @param request
- * - Servlet Request
- * @param type
- * - entity type - feed, process or data end point
- * @param colo
- * - applicable colo
+ *
+ * @param request - Servlet Request
+ * @param type - entity type - feed, process or data end point
+ * @param colo - applicable colo
* @return result of the operation
*/
public APIResult submit(HttpServletRequest request, String type, String colo) {
@@ -162,7 +160,7 @@ public abstract class AbstractEntityManager {
/**
* Post an entity XML with entity type. Validates the XML which can be
* Process, Feed or Dataendpoint
- *
+ *
* @param type
* @return APIResule -Succeeded or Failed
*/
@@ -171,7 +169,8 @@ public abstract class AbstractEntityManager {
EntityType entityType = EntityType.valueOf(type.toUpperCase());
Entity entity = deserializeEntity(request, entityType);
validate(entity);
- return new APIResult(APIResult.Status.SUCCEEDED, "Validated successfully (" + entityType + ") " + entity.getName());
+ return new APIResult(APIResult.Status.SUCCEEDED,
+ "Validated successfully (" + entityType + ") " + entity.getName());
} catch (Throwable e) {
LOG.error("Validation failed for entity (" + type + ") ", e);
throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
@@ -181,7 +180,7 @@ public abstract class AbstractEntityManager {
/**
* Deletes a scheduled entity, a deleted entity is removed completely from
* execution pool.
- *
+ *
* @param type
* @param entity
* @return APIResult
@@ -204,10 +203,12 @@ public abstract class AbstractEntityManager {
configStore.remove(entityType, entity);
} catch (EntityNotRegisteredException e) { // already deleted
- return new APIResult(APIResult.Status.SUCCEEDED, entity + "(" + type + ") doesn't exist. Nothing to do");
+ return new APIResult(APIResult.Status.SUCCEEDED,
+ entity + "(" + type + ") doesn't exist. Nothing to do");
}
- return new APIResult(APIResult.Status.SUCCEEDED, entity + "(" + type + ") removed successfully " + removedFromEngine);
+ return new APIResult(APIResult.Status.SUCCEEDED,
+ entity + "(" + type + ") removed successfully " + removedFromEngine);
} catch (Throwable e) {
LOG.error("Unable to reach workflow engine for deletion or " + "deletion failed", e);
throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
@@ -229,7 +230,7 @@ public abstract class AbstractEntityManager {
if (!EntityUtil.equals(oldEntity, newEntity)) {
configStore.initiateUpdate(newEntity);
//Update in workflow engine
- if(! DeploymentUtil.isPrism()) {
+ if (!DeploymentUtil.isPrism()) {
Set<String> oldClusters = EntityUtil.getClustersDefinedInColos(oldEntity);
Set<String> newClusters = EntityUtil.getClustersDefinedInColos(newEntity);
newClusters.retainAll(oldClusters); //common clusters for update
@@ -238,11 +239,11 @@ public abstract class AbstractEntityManager {
for (String cluster : newClusters) {
getWorkflowEngine().update(oldEntity, newEntity, cluster);
}
- for(String cluster:oldClusters) {
+ for (String cluster : oldClusters) {
getWorkflowEngine().delete(oldEntity, cluster);
}
- }
-
+ }
+
configStore.update(entityType, newEntity);
}
@@ -256,11 +257,14 @@ public abstract class AbstractEntityManager {
}
private void validateUpdate(Entity oldEntity, Entity newEntity) throws FalconException {
- if (oldEntity.getEntityType() != newEntity.getEntityType() || !oldEntity.equals(newEntity))
- throw new FalconException(oldEntity.toShortString() + " can't be updated with " + newEntity.toShortString());
+ if (oldEntity.getEntityType() != newEntity.getEntityType() || !oldEntity.equals(newEntity)) {
+ throw new FalconException(
+ oldEntity.toShortString() + " can't be updated with " + newEntity.toShortString());
+ }
- if (oldEntity.getEntityType() == EntityType.CLUSTER)
+ if (oldEntity.getEntityType() == EntityType.CLUSTER) {
throw new FalconException("Update not supported for clusters");
+ }
String[] props = oldEntity.getEntityType().getImmutableProperties();
for (String prop : props) {
@@ -271,8 +275,9 @@ public abstract class AbstractEntityManager {
} catch (Exception e) {
throw new FalconException(e);
}
- if (!ObjectUtils.equals(oldProp, newProp))
+ if (!ObjectUtils.equals(oldProp, newProp)) {
throw new ValidationException(oldEntity.toShortString() + ": " + prop + " can't be changed");
+ }
}
}
@@ -283,23 +288,27 @@ public abstract class AbstractEntityManager {
for (Pair<String, EntityType> ref : referencedBy) {
messages.append(ref).append("\n");
}
- throw new FalconException(entity.getName() + "(" + entity.getEntityType() + ") cant " + "be removed as it is referred by "
- + messages);
+ throw new FalconException(
+ entity.getName() + "(" + entity.getEntityType() + ") cant " + "be removed as it is referred by "
+ + messages);
}
}
- protected synchronized Entity submitInternal(HttpServletRequest request, String type) throws IOException, FalconException {
+ protected synchronized Entity submitInternal(HttpServletRequest request, String type)
+ throws IOException, FalconException {
EntityType entityType = EntityType.valueOf(type.toUpperCase());
Entity entity = deserializeEntity(request, entityType);
Entity existingEntity = configStore.get(entityType, entity.getName());
if (existingEntity != null) {
- if (EntityUtil.equals(existingEntity, entity))
+ if (EntityUtil.equals(existingEntity, entity)) {
return existingEntity;
+ }
- throw new EntityAlreadyExistsException(entity.toShortString() + " already registered with configuration store. "
- + "Can't be submitted again. Try removing before submitting.");
+ throw new EntityAlreadyExistsException(
+ entity.toShortString() + " already registered with configuration store. "
+ + "Can't be submitted again. Try removing before submitting.");
}
validate(entity);
@@ -308,7 +317,8 @@ public abstract class AbstractEntityManager {
return entity;
}
- protected Entity deserializeEntity(HttpServletRequest request, EntityType entityType) throws IOException, FalconException {
+ protected Entity deserializeEntity(HttpServletRequest request, EntityType entityType)
+ throws IOException, FalconException {
EntityParser<?> entityParser = EntityParserFactory.getParser(entityType);
InputStream xmlStream = request.getInputStream();
@@ -330,7 +340,7 @@ public abstract class AbstractEntityManager {
}
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
+ @SuppressWarnings({"unchecked", "rawtypes"})
private void validate(Entity entity) throws FalconException {
EntityParser entityParser = EntityParserFactory.getParser(entity.getEntityType());
entityParser.validate(entity);
@@ -356,7 +366,7 @@ public abstract class AbstractEntityManager {
/**
* Returns the status of requested entity.
- *
+ *
* @param type
* @param entity
* @return String
@@ -395,7 +405,7 @@ public abstract class AbstractEntityManager {
/**
* Returns dependencies.
- *
+ *
* @param type
* @param entity
* @return EntityList
@@ -406,7 +416,7 @@ public abstract class AbstractEntityManager {
Entity entityObj = EntityUtil.getEntity(type, entity);
Set<Entity> dependents = EntityGraph.get().getDependents(entityObj);
Entity[] entities = dependents.toArray(new Entity[dependents.size()]);
- return new EntityList(entities == null ? new Entity[] {} : entities);
+ return new EntityList(entities == null ? new Entity[]{} : entities);
} catch (Exception e) {
LOG.error("Unable to get dependencies for entity " + entity + "(" + type + ")", e);
throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
@@ -415,7 +425,7 @@ public abstract class AbstractEntityManager {
/**
* Returns the list of entities registered of a given type.
- *
+ *
* @param type
* @return String
*/
@@ -424,7 +434,7 @@ public abstract class AbstractEntityManager {
EntityType entityType = EntityType.valueOf(type.toUpperCase());
Collection<String> entityNames = configStore.getEntities(entityType);
if (entityNames == null || entityNames.equals("")) {
- return new EntityList(new Entity[] {});
+ return new EntityList(new Entity[]{});
}
Entity[] entities = new Entity[entityNames.size()];
int index = 0;
@@ -440,7 +450,7 @@ public abstract class AbstractEntityManager {
/**
* Returns the entity definition as an XML based on name
- *
+ *
* @param type
* @param entityName
* @return String
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
index 6b3de49..8097c43 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
@@ -18,15 +18,6 @@
package org.apache.falcon.resource;
-import java.io.IOException;
-import java.util.Date;
-import java.util.Properties;
-import java.util.Set;
-
-import javax.servlet.ServletInputStream;
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.core.Response;
-
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
@@ -41,9 +32,17 @@ import org.apache.falcon.resource.InstancesResult.Instance;
import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
import org.apache.log4j.Logger;
+import javax.servlet.ServletInputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.util.Date;
+import java.util.Properties;
+import java.util.Set;
+
public abstract class AbstractInstanceManager extends AbstractEntityManager {
private static final Logger LOG = Logger.getLogger(AbstractInstanceManager.class);
-
+
protected void checkType(String type) {
if (StringUtils.isEmpty(type)) {
throw FalconWebException.newInstanceException("entity type is empty",
@@ -51,7 +50,8 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
} else {
EntityType entityType = EntityType.valueOf(type.toUpperCase());
if (entityType == EntityType.CLUSTER) {
- throw FalconWebException.newInstanceException("Instance management functions don't apply to Cluster entities",
+ throw FalconWebException.newInstanceException(
+ "Instance management functions don't apply to Cluster entities",
Response.Status.BAD_REQUEST);
}
}
@@ -73,45 +73,45 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
public InstancesResult getStatus(String type, String entity, String startStr, String endStr,
- String colo) {
+ String colo) {
checkColo(colo);
checkType(type);
try {
- validateParams(type, entity, startStr, endStr);
-
- Date start = EntityUtil.parseDateUTC(startStr);
- Date end = getEndDate(start, endStr);
- Entity entityObject = EntityUtil.getEntity(type, entity);
-
- AbstractWorkflowEngine wfEngine = getWorkflowEngine();
- return wfEngine.getStatus(
- entityObject, start, end);
- } catch (Throwable e) {
- LOG.error("Failed to get instances status", e);
- throw FalconWebException
- .newInstanceException(e, Response.Status.BAD_REQUEST);
- }
- }
-
- public InstancesResult getLogs(String type, String entity, String startStr,
- String endStr, String colo, String runId){
-
- try {
- // TODO getStatus does all validations and filters clusters
- InstancesResult result = getStatus(type, entity, startStr, endStr,
- colo);
- LogProvider logProvider = new LogProvider();
- Entity entityObject = EntityUtil.getEntity(type, entity);
- for (Instance instance : result.getInstances()) {
- logProvider.populateLogUrls(entityObject, instance, runId);
- }
- return result;
- } catch (Exception e) {
- LOG.error("Failed to get logs for instances", e);
- throw FalconWebException.newInstanceException(e,
- Response.Status.BAD_REQUEST);
- }
- }
+ validateParams(type, entity, startStr, endStr);
+
+ Date start = EntityUtil.parseDateUTC(startStr);
+ Date end = getEndDate(start, endStr);
+ Entity entityObject = EntityUtil.getEntity(type, entity);
+
+ AbstractWorkflowEngine wfEngine = getWorkflowEngine();
+ return wfEngine.getStatus(
+ entityObject, start, end);
+ } catch (Throwable e) {
+ LOG.error("Failed to get instances status", e);
+ throw FalconWebException
+ .newInstanceException(e, Response.Status.BAD_REQUEST);
+ }
+ }
+
+ public InstancesResult getLogs(String type, String entity, String startStr,
+ String endStr, String colo, String runId) {
+
+ try {
+ // TODO getStatus does all validations and filters clusters
+ InstancesResult result = getStatus(type, entity, startStr, endStr,
+ colo);
+ LogProvider logProvider = new LogProvider();
+ Entity entityObject = EntityUtil.getEntity(type, entity);
+ for (Instance instance : result.getInstances()) {
+ logProvider.populateLogUrls(entityObject, instance, runId);
+ }
+ return result;
+ } catch (Exception e) {
+ LOG.error("Failed to get logs for instances", e);
+ throw FalconWebException.newInstanceException(e,
+ Response.Status.BAD_REQUEST);
+ }
+ }
public InstancesResult killInstance(HttpServletRequest request,
String type, String entity, String startStr, String endStr, String colo) {
@@ -121,11 +121,11 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
try {
audit(request, entity, type, "INSTANCE_KILL");
validateParams(type, entity, startStr, endStr);
-
+
Date start = EntityUtil.parseDateUTC(startStr);
- Date end = getEndDate(start, endStr);
+ Date end = getEndDate(start, endStr);
Entity entityObject = EntityUtil.getEntity(type, entity);
-
+
Properties props = getProperties(request);
AbstractWorkflowEngine wfEngine = getWorkflowEngine();
return wfEngine.killInstances(entityObject, start, end, props);
@@ -143,11 +143,11 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
try {
audit(request, entity, type, "INSTANCE_SUSPEND");
validateParams(type, entity, startStr, endStr);
-
+
Date start = EntityUtil.parseDateUTC(startStr);
- Date end = getEndDate(start, endStr);
+ Date end = getEndDate(start, endStr);
Entity entityObject = EntityUtil.getEntity(type, entity);
-
+
Properties props = getProperties(request);
AbstractWorkflowEngine wfEngine = getWorkflowEngine();
return wfEngine.suspendInstances(entityObject, start, end, props);
@@ -165,11 +165,11 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
try {
audit(request, entity, type, "INSTANCE_RESUME");
validateParams(type, entity, startStr, endStr);
-
+
Date start = EntityUtil.parseDateUTC(startStr);
- Date end = getEndDate(start, endStr);
+ Date end = getEndDate(start, endStr);
Entity entityObject = EntityUtil.getEntity(type, entity);
-
+
Properties props = getProperties(request);
AbstractWorkflowEngine wfEngine = getWorkflowEngine();
return wfEngine.resumeInstances(entityObject, start, end, props);
@@ -180,16 +180,16 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
}
public InstancesResult reRunInstance(String type, String entity, String startStr, String endStr,
- HttpServletRequest request, String colo) {
+ HttpServletRequest request, String colo) {
checkColo(colo);
checkType(type);
try {
audit(request, entity, type, "INSTANCE_RERUN");
validateParams(type, entity, startStr, endStr);
-
+
Date start = EntityUtil.parseDateUTC(startStr);
- Date end = getEndDate(start, endStr);
+ Date end = getEndDate(start, endStr);
Entity entityObject = EntityUtil.getEntity(type, entity);
Properties props = getProperties(request);
@@ -203,7 +203,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
private Properties getProperties(HttpServletRequest request) throws IOException {
Properties props = new Properties();
- ServletInputStream xmlStream = request==null?null:request.getInputStream();
+ ServletInputStream xmlStream = request == null ? null : request.getInputStream();
if (xmlStream != null) {
if (xmlStream.markSupported()) {
xmlStream.mark(XML_DEBUG_LEN); // mark up to debug len
@@ -217,11 +217,12 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
Date end;
if (StringUtils.isEmpty(endStr)) {
end = new Date(start.getTime() + 1000); // next sec
- } else
+ } else {
end = EntityUtil.parseDateUTC(endStr);
+ }
return end;
}
-
+
private void validateParams(String type, String entity, String startStr, String endStr) throws FalconException {
validateNotEmpty("entityType", type);
validateNotEmpty("entityName", entity);
@@ -231,13 +232,14 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
validateDateRange(entityObject, startStr, endStr);
}
- private void validateDateRange(Entity entity, String start, String end)
- throws FalconException {
+ private void validateDateRange(Entity entity, String start, String end)
+ throws FalconException {
Set<String> clusters = EntityUtil.getClustersDefined(entity);
Pair<Date, String> clusterMinStartDate = null;
Pair<Date, String> clusterMaxEndDate = null;
for (String cluster : clusters) {
- if (clusterMinStartDate == null || clusterMinStartDate.first.after(EntityUtil.getStartTime(entity, cluster))) {
+ if (clusterMinStartDate == null || clusterMinStartDate.first.after(
+ EntityUtil.getStartTime(entity, cluster))) {
clusterMinStartDate = Pair.of(EntityUtil.getStartTime(entity, cluster), cluster);
}
if (clusterMaxEndDate == null || clusterMaxEndDate.first.before(EntityUtil.getEndTime(entity, cluster))) {
@@ -245,40 +247,46 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
}
}
- validateDateRangeFor(entity, clusterMinStartDate, clusterMaxEndDate,
- start, end);
- }
-
- private void validateDateRangeFor(Entity entity, Pair<Date,String> clusterMinStart,
- Pair<Date,String> clusterMaxEnd, String start, String end) throws FalconException {
-
- Date instStart = EntityUtil.parseDateUTC(start);
- if (instStart.before(clusterMinStart.first))
- throw new ValidationException("Start date " + start + " is before "
- + entity.getEntityType() + "'s start "
- + SchemaHelper.formatDateUTC(clusterMinStart.first)
- + " for cluster " + clusterMinStart.second);
-
- if (StringUtils.isNotEmpty(end)) {
- Date instEnd = EntityUtil.parseDateUTC(end);
- if (instStart.after(instEnd))
- throw new ValidationException("Start date " + start
- + " is after end date " + end);
-
- if (instEnd.after(clusterMaxEnd.first))
- throw new ValidationException("End date " + end + " is after "
- + entity.getEntityType() + "'s end "
- + SchemaHelper.formatDateUTC(clusterMaxEnd.first)
- + " for cluster " + clusterMaxEnd.second);
- } else if (instStart.after(clusterMaxEnd.first))
- throw new ValidationException("Start date " + start + " is after "
- + entity.getEntityType() + "'s end "
- + SchemaHelper.formatDateUTC(clusterMaxEnd.first)
- + " for cluster " + clusterMaxEnd.second);
+ validateDateRangeFor(entity, clusterMinStartDate, clusterMaxEndDate,
+ start, end);
+ }
+
+ private void validateDateRangeFor(Entity entity, Pair<Date, String> clusterMinStart,
+ Pair<Date, String> clusterMaxEnd, String start, String end)
+ throws FalconException {
+
+ Date instStart = EntityUtil.parseDateUTC(start);
+ if (instStart.before(clusterMinStart.first)) {
+ throw new ValidationException("Start date " + start + " is before "
+ + entity.getEntityType() + "'s start "
+ + SchemaHelper.formatDateUTC(clusterMinStart.first)
+ + " for cluster " + clusterMinStart.second);
+ }
+
+ if (StringUtils.isNotEmpty(end)) {
+ Date instEnd = EntityUtil.parseDateUTC(end);
+ if (instStart.after(instEnd)) {
+ throw new ValidationException("Start date " + start
+ + " is after end date " + end);
+ }
+
+ if (instEnd.after(clusterMaxEnd.first)) {
+ throw new ValidationException("End date " + end + " is after "
+ + entity.getEntityType() + "'s end "
+ + SchemaHelper.formatDateUTC(clusterMaxEnd.first)
+ + " for cluster " + clusterMaxEnd.second);
+ }
+ } else if (instStart.after(clusterMaxEnd.first)) {
+ throw new ValidationException("Start date " + start + " is after "
+ + entity.getEntityType() + "'s end "
+ + SchemaHelper.formatDateUTC(clusterMaxEnd.first)
+ + " for cluster " + clusterMaxEnd.second);
+ }
}
private void validateNotEmpty(String field, String param) throws ValidationException {
- if (StringUtils.isEmpty(param))
+ if (StringUtils.isEmpty(param)) {
throw new ValidationException("Parameter " + field + " is empty");
- }
+ }
+ }
}