You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sr...@apache.org on 2013/04/26 17:50:33 UTC
[17/47] Fixes for Checkstyle
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
index d8751a2..ba618ef 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ProcessEntityParser.java
@@ -18,35 +18,23 @@
package org.apache.falcon.entity.parser;
-import java.net.ConnectException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TimeZone;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
-import org.apache.falcon.entity.v0.process.Input;
-import org.apache.falcon.entity.v0.process.Inputs;
-import org.apache.falcon.entity.v0.process.LateInput;
-import org.apache.falcon.entity.v0.process.Output;
-import org.apache.falcon.entity.v0.process.Outputs;
+import org.apache.falcon.entity.v0.process.*;
import org.apache.falcon.entity.v0.process.Process;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.net.ConnectException;
+import java.util.*;
/**
* Concrete Parser which has XML parsing and validation logic for Process XML.
- *
*/
public class ProcessEntityParser extends EntityParser<Process> {
@@ -56,17 +44,18 @@ public class ProcessEntityParser extends EntityParser<Process> {
@Override
public void validate(Process process) throws FalconException {
- if(process.getTimezone() == null)
+ if (process.getTimezone() == null) {
process.setTimezone(TimeZone.getTimeZone("UTC"));
+ }
// check if dependent entities exists
Set<String> clusters = new HashSet<String>();
for (org.apache.falcon.entity.v0.process.Cluster cluster : process.getClusters().getClusters()) {
String clusterName = cluster.getName();
- if (!clusters.add(cluster.getName())) {
- throw new ValidationException("Cluster: " + cluster.getName()
- + " is defined more than once for process: "+process.getName());
- }
+ if (!clusters.add(cluster.getName())) {
+ throw new ValidationException("Cluster: " + cluster.getName()
+ + " is defined more than once for process: " + process.getName());
+ }
validateEntityExists(EntityType.CLUSTER, clusterName);
validateProcessValidity(cluster.getValidity().getStart(), cluster.getValidity().getEnd());
validateHDFSpaths(process, clusterName);
@@ -98,25 +87,28 @@ public class ProcessEntityParser extends EntityParser<Process> {
}
private void validateHDFSpaths(Process process, String clusterName) throws FalconException {
- org.apache.falcon.entity.v0.cluster.Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER, clusterName);
+ org.apache.falcon.entity.v0.cluster.Cluster cluster = ConfigurationStore.get().get(EntityType.CLUSTER,
+ clusterName);
String workflowPath = process.getWorkflow().getPath();
- String libPath=process.getWorkflow().getLib();
+ String libPath = process.getWorkflow().getLib();
String nameNode = getNameNode(cluster, clusterName);
try {
Configuration configuration = new Configuration();
configuration.set("fs.default.name", nameNode);
FileSystem fs = FileSystem.get(configuration);
if (!fs.exists(new Path(workflowPath))) {
- throw new ValidationException("Workflow path: " + workflowPath + " does not exists in HDFS: " + nameNode);
+ throw new ValidationException(
+ "Workflow path: " + workflowPath + " does not exists in HDFS: " + nameNode);
}
-
- if (libPath!=null && !fs.exists(new Path(libPath))) {
+
+ if (libPath != null && !fs.exists(new Path(libPath))) {
throw new ValidationException("Lib path: " + libPath + " does not exists in HDFS: " + nameNode);
}
} catch (ValidationException e) {
throw new ValidationException(e);
} catch (ConnectException e) {
- throw new ValidationException("Unable to connect to Namenode: " + nameNode + " referenced in cluster: " + clusterName);
+ throw new ValidationException(
+ "Unable to connect to Namenode: " + nameNode + " referenced in cluster: " + clusterName);
} catch (Exception e) {
throw new FalconException(e);
}
@@ -125,8 +117,9 @@ public class ProcessEntityParser extends EntityParser<Process> {
private String getNameNode(Cluster cluster, String clusterName) throws ValidationException {
// cluster should never be null as it is validated while submitting
// feeds.
- if (new Path(ClusterHelper.getStorageUrl(cluster)).toUri().getScheme()==null) {
- throw new ValidationException("Cannot get valid nameNode scheme from write interface of cluster: " + clusterName);
+ if (new Path(ClusterHelper.getStorageUrl(cluster)).toUri().getScheme() == null) {
+ throw new ValidationException(
+ "Cannot get valid nameNode scheme from write interface of cluster: " + clusterName);
}
return ClusterHelper.getStorageUrl(cluster);
}
@@ -134,7 +127,8 @@ public class ProcessEntityParser extends EntityParser<Process> {
private void validateProcessValidity(Date start, Date end) throws FalconException {
try {
if (!start.before(end)) {
- throw new ValidationException("Process start time: " + start + " should be before process end time: " + end);
+ throw new ValidationException(
+ "Process start time: " + start + " should be before process end time: " + end);
}
} catch (ValidationException e) {
throw new ValidationException(e);
@@ -145,15 +139,15 @@ public class ProcessEntityParser extends EntityParser<Process> {
private void validateDatasetName(Inputs inputs, Outputs outputs) throws ValidationException {
Set<String> datasetNames = new HashSet<String>();
- if(inputs != null) {
+ if (inputs != null) {
for (Input input : inputs.getInputs()) {
if (!datasetNames.add(input.getName())) {
throw new ValidationException("Input name: " + input.getName() + " is already used");
}
}
}
-
- if(outputs != null) {
+
+ if (outputs != null) {
for (Output output : outputs.getOutputs()) {
if (!datasetNames.add(output.getName())) {
throw new ValidationException("Output name: " + output.getName() + " is already used");
@@ -163,27 +157,28 @@ public class ProcessEntityParser extends EntityParser<Process> {
}
private void validateLateInputs(Process process) throws ValidationException {
- Map<String,String> feeds = new HashMap<String,String>();
- if(process.getInputs() != null) {
+ Map<String, String> feeds = new HashMap<String, String>();
+ if (process.getInputs() != null) {
for (Input in : process.getInputs().getInputs()) {
- feeds.put(in.getName(),in.getFeed());
+ feeds.put(in.getName(), in.getFeed());
}
}
if (process.getLateProcess() != null) {
- for (LateInput lp : process.getLateProcess().getLateInputs()) {
- if (!feeds.keySet().contains(lp.getInput())){
- throw new ValidationException("Late Input: " + lp.getInput() + " is not specified in the inputs");
- }
- try {
- Feed feed = (Feed) ConfigurationStore.get().get(EntityType.FEED, feeds.get(lp.getInput()));
- if(feed.getLateArrival()==null){
- throw new ValidationException("Late Input feed: "+lp.getInput()+" is not configured with late arrival cut-off" );
- }
- } catch (FalconException e) {
- throw new ValidationException(e);
- }
- }
+ for (LateInput lp : process.getLateProcess().getLateInputs()) {
+ if (!feeds.keySet().contains(lp.getInput())) {
+ throw new ValidationException("Late Input: " + lp.getInput() + " is not specified in the inputs");
+ }
+ try {
+ Feed feed = (Feed) ConfigurationStore.get().get(EntityType.FEED, feeds.get(lp.getInput()));
+ if (feed.getLateArrival() == null) {
+ throw new ValidationException(
+ "Late Input feed: " + lp.getInput() + " is not configured with late arrival cut-off");
+ }
+ } catch (FalconException e) {
+ throw new ValidationException(e);
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/entity/parser/ValidationException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ValidationException.java b/common/src/main/java/org/apache/falcon/entity/parser/ValidationException.java
index fc9ccdf..e01a378 100644
--- a/common/src/main/java/org/apache/falcon/entity/parser/ValidationException.java
+++ b/common/src/main/java/org/apache/falcon/entity/parser/ValidationException.java
@@ -21,22 +21,20 @@ package org.apache.falcon.entity.parser;
import org.apache.falcon.FalconException;
/**
- *
* ValidationException during parsing
- *
*/
public class ValidationException extends FalconException {
- public ValidationException(String message) {
- super(message);
- }
+ public ValidationException(String message) {
+ super(message);
+ }
- public ValidationException(Exception e) {
- super(e);
+ public ValidationException(Exception e) {
+ super(e);
}
- public ValidationException(String message, Exception e) {
- super(message, e);
+ public ValidationException(String message, Exception e) {
+ super(message, e);
}
private static final long serialVersionUID = -4502166408759507355L;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
index c882453..8fd3775 100644
--- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
+++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
@@ -18,25 +18,6 @@
package org.apache.falcon.entity.store;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.URLDecoder;
-import java.net.URLEncoder;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.xml.bind.JAXBException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
@@ -44,8 +25,21 @@ import org.apache.falcon.service.ConfigurationChangeListener;
import org.apache.falcon.service.FalconService;
import org.apache.falcon.util.ReflectionUtils;
import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
+import javax.xml.bind.JAXBException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
public class ConfigurationStore implements FalconService {
private static final Logger LOG = Logger.getLogger(ConfigurationStore.class);
@@ -62,7 +56,8 @@ public class ConfigurationStore implements FalconService {
return store;
}
- private final Map<EntityType, ConcurrentHashMap<String, Entity>> dictionary = new HashMap<EntityType, ConcurrentHashMap<String, Entity>>();
+ private final Map<EntityType, ConcurrentHashMap<String, Entity>> dictionary
+ = new HashMap<EntityType, ConcurrentHashMap<String, Entity>>();
private final FileSystem fs;
private final Path storePath;
@@ -98,13 +93,15 @@ public class ConfigurationStore implements FalconService {
public void init() throws FalconException {
String listenerClassNames = StartupProperties.get().
getProperty("configstore.listeners", "org.apache.falcon.entity.v0.EntityGraph");
- for(String listenerClassName:listenerClassNames.split(",")) {
+ for (String listenerClassName : listenerClassNames.split(",")) {
listenerClassName = listenerClassName.trim();
- if (listenerClassName.isEmpty()) continue;
+ if (listenerClassName.isEmpty()) {
+ continue;
+ }
ConfigurationChangeListener listener = ReflectionUtils.getInstanceByClassName(listenerClassName);
registerListener(listener);
}
-
+
try {
for (EntityType type : EntityType.values()) {
ConcurrentHashMap<String, Entity> entityMap = dictionary.get(type);
@@ -113,7 +110,7 @@ public class ConfigurationStore implements FalconService {
for (FileStatus file : files) {
String fileName = file.getPath().getName();
String encodedEntityName = fileName.substring(0, fileName.length() - 4); // drop
- // ".xml"
+ // ".xml"
String entityName = URLDecoder.decode(encodedEntityName, UTF_8);
Entity entity = restore(type, entityName);
entityMap.put(entityName, entity);
@@ -131,11 +128,8 @@ public class ConfigurationStore implements FalconService {
}
/**
- *
- * @param type
- * - EntityType that need to be published
- * @param entity
- * - Reference to the Entity Object
+ * @param type - EntityType that need to be published
+ * @param entity - Reference to the Entity Object
* @throws FalconException
*/
public synchronized void publish(EntityType type, Entity entity) throws FalconException {
@@ -145,8 +139,9 @@ public class ConfigurationStore implements FalconService {
dictionary.get(type).put(entity.getName(), entity);
onAdd(entity);
} else {
- throw new EntityAlreadyExistsException(entity.toShortString() + " already registered with configuration store. "
- + "Can't be submitted again. Try removing before submitting.");
+ throw new EntityAlreadyExistsException(
+ entity.toShortString() + " already registered with configuration store. "
+ + "Can't be submitted again. Try removing before submitting.");
}
} catch (IOException e) {
throw new StoreAccessException(e);
@@ -193,25 +188,22 @@ public class ConfigurationStore implements FalconService {
private void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
for (ConfigurationChangeListener listener : listeners) {
- listener.onChange(oldEntity, newEntity);
+ listener.onChange(oldEntity, newEntity);
}
}
public synchronized void initiateUpdate(Entity entity) throws FalconException {
if (get(entity.getEntityType(), entity.getName()) == null || updatesInProgress.get() != null) {
- throw new FalconException("An update for " + entity.toShortString() + " is already in progress or doesn't exist");
+ throw new FalconException(
+ "An update for " + entity.toShortString() + " is already in progress or doesn't exist");
}
updatesInProgress.set(entity);
}
/**
- *
- * @param type
- * - Entity type that is being retrieved
- * @param name
- * - Name as it appears in the entity xml definition
- * @param <T>
- * - Actual Entity object type
+ * @param type - Entity type that is being retrieved
+ * @param name - Name as it appears in the entity xml definition
+ * @param <T> - Actual Entity object type
* @return - Entity object from internal dictionary, If the object is not
* loaded in memory yet, it will retrieve it from persistent store
* just in time. On startup all the entities will be added to the
@@ -249,11 +241,9 @@ public class ConfigurationStore implements FalconService {
/**
* Remove an entity which is already stored in the config store
- *
- * @param type
- * - Entity type being removed
- * @param name
- * - Name of the entity object being removed
+ *
+ * @param type - Entity type being removed
+ * @param name - Name of the entity object being removed
* @return - True is remove is successful, false if request entity doesn't
* exist
* @throws FalconException
@@ -279,19 +269,17 @@ public class ConfigurationStore implements FalconService {
listener.onRemove(entity);
} catch (Throwable e) {
LOG.warn(
- "Encountered exception while notifying " + listener + "(" + entity.getEntityType() + ") " + entity.getName(),
+ "Encountered exception while notifying " + listener + "(" + entity.getEntityType() + ") "
+ + entity.getName(),
e);
}
}
}
/**
- *
- * @param type
- * - Entity type that needs to be searched
- * @param keywords
- * - List of keywords to search for. only entities that have all
- * the keywords being searched would be returned
+ * @param type - Entity type that needs to be searched
+ * @param keywords - List of keywords to search for. only entities that have all
+ * the keywords being searched would be returned
* @return - Array of entity types
*/
public Entity[] search(EntityType type, String... keywords) {
@@ -299,20 +287,17 @@ public class ConfigurationStore implements FalconService {
}
/**
- *
- * @param type
- * - Entity type that is to be stored into persistent storage
- * @param entity
- * - entity to persist. JAXB Annotated entity will be marshalled
- * to the persistent store. The convention used for storing the
- * object:: PROP(config.store.uri)/{entitytype}/{entityname}.xml
- * @throws java.io.IOException
- * If any error in accessing the storage
+ * @param type - Entity type that is to be stored into persistent storage
+ * @param entity - entity to persist. JAXB Annotated entity will be marshalled
+ * to the persistent store. The convention used for storing the
+ * object:: PROP(config.store.uri)/{entitytype}/{entityname}.xml
+ * @throws java.io.IOException If any error in accessing the storage
* @throws FalconException
*/
private void persist(EntityType type, Entity entity) throws IOException, FalconException {
OutputStream out = fs
- .create(new Path(storePath, type + Path.SEPARATOR + URLEncoder.encode(entity.getName(), UTF_8) + ".xml"));
+ .create(new Path(storePath,
+ type + Path.SEPARATOR + URLEncoder.encode(entity.getName(), UTF_8) + ".xml"));
try {
type.getMarshaller().marshal(entity, out);
LOG.info("Persisted configuration " + type + "/" + entity.getName());
@@ -326,37 +311,31 @@ public class ConfigurationStore implements FalconService {
/**
* Archive removed configuration in the persistent store
- *
- * @param type
- * - Entity type to archive
- * @param name
- * - name
- * @throws IOException
- * If any error in accessing the storage
+ *
+ * @param type - Entity type to archive
+ * @param name - name
+ * @throws IOException If any error in accessing the storage
*/
private void archive(EntityType type, String name) throws IOException {
Path archivePath = new Path(storePath, "archive" + Path.SEPARATOR + type);
fs.mkdirs(archivePath);
- fs.rename(new Path(storePath, type + Path.SEPARATOR + URLEncoder.encode(name, UTF_8) + ".xml"), new Path(archivePath,
- URLEncoder.encode(name, UTF_8) + "." + System.currentTimeMillis()));
+ fs.rename(new Path(storePath, type + Path.SEPARATOR + URLEncoder.encode(name, UTF_8) + ".xml"),
+ new Path(archivePath,
+ URLEncoder.encode(name, UTF_8) + "." + System.currentTimeMillis()));
LOG.info("Archived configuration " + type + "/" + name);
}
/**
- *
- * @param type
- * - Entity type to restore from persistent store
- * @param name
- * - Name of the entity to restore.
- * @param <T>
- * - Actual entity object type
+ * @param type - Entity type to restore from persistent store
+ * @param name - Name of the entity to restore.
+ * @param <T> - Actual entity object type
* @return - De-serialized entity object restored from persistent store
- * @throws IOException
- * If any error in accessing the storage
+ * @throws IOException If any error in accessing the storage
* @throws FalconException
*/
@SuppressWarnings("unchecked")
- private synchronized <T extends Entity> T restore(EntityType type, String name) throws IOException, FalconException {
+ private synchronized <T extends Entity> T restore(EntityType type, String name)
+ throws IOException, FalconException {
InputStream in = fs.open(new Path(storePath, type + Path.SEPARATOR + URLEncoder.encode(name, UTF_8) + ".xml"));
try {
@@ -379,5 +358,6 @@ public class ConfigurationStore implements FalconService {
}
@Override
- public void destroy() { }
+ public void destroy() {
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java b/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java
index b9b504d..a231242 100644
--- a/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java
+++ b/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java
@@ -22,14 +22,14 @@ import org.apache.falcon.FalconException;
public class StoreAccessException extends FalconException {
- /**
- * @param e Exception
- */
- public StoreAccessException(String message, Exception e) {
- super(message, e);
- }
+ /**
+ * @param e Exception
+ */
+ public StoreAccessException(String message, Exception e) {
+ super(message, e);
+ }
- public StoreAccessException(Exception e) {
- super(e);
- }
+ public StoreAccessException(Exception e) {
+ super(e);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java b/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
index 75d6d8b..f2b66e5 100644
--- a/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
+++ b/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
@@ -18,12 +18,6 @@
package org.apache.falcon.entity.v0;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.feed.Feed;
@@ -34,6 +28,12 @@ import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.service.ConfigurationChangeListener;
import org.apache.log4j.Logger;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
public class EntityGraph implements ConfigurationChangeListener {
private static Logger LOG = Logger.getLogger(EntityGraph.class);
@@ -76,8 +76,9 @@ public class EntityGraph implements ConfigurationChangeListener {
nodeEdges = getEdgesFor((Feed) entity);
break;
}
- if (nodeEdges == null)
+ if (nodeEdges == null) {
return;
+ }
LOG.trace("Adding edges for " + entity.getName() + ": " + nodeEdges);
for (Map.Entry<Node, Set<Node>> entry : nodeEdges.entrySet()) {
@@ -101,8 +102,9 @@ public class EntityGraph implements ConfigurationChangeListener {
nodeEdges = getEdgesFor((Feed) entity);
break;
}
- if (nodeEdges == null)
+ if (nodeEdges == null) {
return;
+ }
for (Map.Entry<Node, Set<Node>> entry : nodeEdges.entrySet()) {
if (graph.containsKey(entry.getKey())) {
@@ -154,7 +156,7 @@ public class EntityGraph implements ConfigurationChangeListener {
nodeEdges.put(clusterNode, new HashSet<Node>());
nodeEdges.get(clusterNode).add(processNode);
}
-
+
return nodeEdges;
}
@@ -188,19 +190,23 @@ public class EntityGraph implements ConfigurationChangeListener {
@Override
public boolean equals(Object o) {
- if (this == o)
+ if (this == o) {
return true;
- if (o == null || getClass() != o.getClass())
+ }
+ if (o == null || getClass() != o.getClass()) {
return false;
+ }
Node node = (Node) o;
boolean nameEqual = name != null ? !name.equals(node.name) : node.name != null;
- if (nameEqual)
+ if (nameEqual) {
return false;
- if (type != node.type)
+ }
+ if (type != node.type) {
return false;
+ }
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java b/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java
index 1a9febc..b523c8b 100644
--- a/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java
+++ b/common/src/main/java/org/apache/falcon/entity/v0/EntityIntegrityChecker.java
@@ -18,21 +18,22 @@
package org.apache.falcon.entity.v0;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.Pair;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.Pair;
-
public class EntityIntegrityChecker {
public static Pair<String, EntityType>[] referencedBy(Entity entity) throws FalconException {
Set<Entity> deps = EntityGraph.get().getDependents(entity);
- if(deps == null)
+ if (deps == null) {
return null;
-
+ }
+
switch (entity.getEntityType()) {
case CLUSTER:
return filter(deps, EntityType.FEED, EntityType.PROCESS);
@@ -46,12 +47,14 @@ public class EntityIntegrityChecker {
}
@SuppressWarnings("unchecked")
- private static Pair<String, EntityType>[] filter(Set<Entity> deps, EntityType ... types) {
- List<Pair<String, EntityType>> filteredSet = new ArrayList<Pair<String,EntityType>>();
+ private static Pair<String, EntityType>[] filter(Set<Entity> deps, EntityType... types) {
+ List<Pair<String, EntityType>> filteredSet = new ArrayList<Pair<String, EntityType>>();
List<EntityType> validTypes = Arrays.asList(types);
- for(Entity dep:deps)
- if(validTypes.contains(dep.getEntityType()))
+ for (Entity dep : deps) {
+ if (validTypes.contains(dep.getEntityType())) {
filteredSet.add(Pair.of(dep.getName(), dep.getEntityType()));
+ }
+ }
return filteredSet.toArray(new Pair[0]);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/entity/v0/UnschedulableEntityException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/v0/UnschedulableEntityException.java b/common/src/main/java/org/apache/falcon/entity/v0/UnschedulableEntityException.java
index bbf2749..cad196b 100644
--- a/common/src/main/java/org/apache/falcon/entity/v0/UnschedulableEntityException.java
+++ b/common/src/main/java/org/apache/falcon/entity/v0/UnschedulableEntityException.java
@@ -24,7 +24,6 @@ import org.apache.falcon.FalconException;
* This exception is thrown when Unschedulable entity
* like CLUSTER is tried with actions like Schedule, Suspend,
* Resume.
- *
*/
public class UnschedulableEntityException extends FalconException {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java b/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
index 02e76a7..f57ef95 100644
--- a/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
+++ b/common/src/main/java/org/apache/falcon/expression/ExpressionHelper.java
@@ -49,12 +49,13 @@ public final class ExpressionHelper implements FunctionMapper, VariableResolver
return instance;
}
- private ExpressionHelper() {}
+ private ExpressionHelper() {
+ }
public <T> T evaluate(String expression, Class<T> clazz) throws FalconException {
return evaluateFullExpression("${" + expression + "}", clazz);
}
-
+
@SuppressWarnings("unchecked")
public <T> T evaluateFullExpression(String expression, Class<T> clazz) throws FalconException {
try {
@@ -67,8 +68,9 @@ public final class ExpressionHelper implements FunctionMapper, VariableResolver
@Override
public Method resolveFunction(String prefix, String name) {
for (Method method : ExpressionHelper.class.getDeclaredMethods()) {
- if (method.getName().equals(name))
+ if (method.getName().equals(name)) {
return method;
+ }
}
throw new UnsupportedOperationException("Not found " + prefix + ":" + name);
}
@@ -146,15 +148,15 @@ public final class ExpressionHelper implements FunctionMapper, VariableResolver
}
public static Date latest(int n) {
- //by pass Falcon validations
- return referenceDate.get();
+ //by pass Falcon validations
+ return referenceDate.get();
}
-
+
public static Date future(int n, int limit) {
- //by pass Falcon validations
- return referenceDate.get();
+ //by pass Falcon validations
+ return referenceDate.get();
}
-
+
public static long hours(int val) {
return TimeUnit.HOURS.toMillis(val);
}
@@ -180,19 +182,19 @@ public final class ExpressionHelper implements FunctionMapper, VariableResolver
}
public static String substitute(String originalValue, Properties properties) {
- Matcher envVarMatcher = sysPropertyPattern.matcher(originalValue);
- while (envVarMatcher.find()) {
- String envVar = originalValue.substring(envVarMatcher.start() + 2,
- envVarMatcher.end() - 1);
- String envVal = properties.getProperty(envVar, System.getenv(envVar));
-
- envVar = "\\$\\{" + envVar + "\\}";
- if (envVal != null) {
- originalValue = originalValue.replaceAll(envVar, Matcher.quoteReplacement(envVal));
- envVarMatcher = sysPropertyPattern.matcher(originalValue);
+ Matcher envVarMatcher = sysPropertyPattern.matcher(originalValue);
+ while (envVarMatcher.find()) {
+ String envVar = originalValue.substring(envVarMatcher.start() + 2,
+ envVarMatcher.end() - 1);
+ String envVal = properties.getProperty(envVar, System.getenv(envVar));
+
+ envVar = "\\$\\{" + envVar + "\\}";
+ if (envVal != null) {
+ originalValue = originalValue.replaceAll(envVar, Matcher.quoteReplacement(envVal));
+ envVarMatcher = sysPropertyPattern.matcher(originalValue);
+ }
}
- }
- return originalValue;
+ return originalValue;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/group/FeedGroup.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/group/FeedGroup.java b/common/src/main/java/org/apache/falcon/group/FeedGroup.java
index 4dade88..e40ab13 100644
--- a/common/src/main/java/org/apache/falcon/group/FeedGroup.java
+++ b/common/src/main/java/org/apache/falcon/group/FeedGroup.java
@@ -17,6 +17,11 @@
*/
package org.apache.falcon.group;
+import org.apache.falcon.entity.FeedHelper;
+import org.apache.falcon.entity.common.FeedDataPath;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.feed.LocationType;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -24,80 +29,75 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
-import org.apache.falcon.entity.FeedHelper;
-import org.apache.falcon.entity.common.FeedDataPath;
-import org.apache.falcon.entity.v0.Frequency;
-import org.apache.falcon.entity.v0.feed.LocationType;
-
/**
* Group, which represents a logical group of feeds which can belong to this
* group.
*/
public class FeedGroup {
-
- public FeedGroup(String group, Frequency frequency, String path) {
- this.name = group;
- this.frequency = frequency;
- this.datePattern = getDatePattern(path);
- this.feeds = Collections
- .newSetFromMap(new ConcurrentHashMap<String, Boolean>());
- }
- public static String getDatePattern(String path) {
- Matcher matcher = FeedDataPath.PATTERN.matcher(path);
- List<String> fields = new ArrayList<String>();
- while (matcher.find()) {
- String var = path.substring(matcher.start(), matcher.end());
- fields.add(var);
- }
- Collections.sort(fields);
- return fields.toString();
- }
+ public FeedGroup(String group, Frequency frequency, String path) {
+ this.name = group;
+ this.frequency = frequency;
+ this.datePattern = getDatePattern(path);
+ this.feeds = Collections
+ .newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+ }
+
+ public static String getDatePattern(String path) {
+ Matcher matcher = FeedDataPath.PATTERN.matcher(path);
+ List<String> fields = new ArrayList<String>();
+ while (matcher.find()) {
+ String var = path.substring(matcher.start(), matcher.end());
+ fields.add(var);
+ }
+ Collections.sort(fields);
+ return fields.toString();
+ }
- private String name;
- private Frequency frequency;
- private String datePattern;
- private Set<String> feeds;
+ private String name;
+ private Frequency frequency;
+ private String datePattern;
+ private Set<String> feeds;
- public Set<String> getFeeds() {
- return feeds;
- }
+ public Set<String> getFeeds() {
+ return feeds;
+ }
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof FeedGroup) || obj == null) {
- return false;
- }
- FeedGroup group = (FeedGroup) obj;
- return (this.name.equals(group.getName())
- && this.frequency.equals(group.frequency)
- && this.datePattern
- .equals(group.datePattern));
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof FeedGroup) || obj == null) {
+ return false;
+ }
+ FeedGroup group = (FeedGroup) obj;
+ return (this.name.equals(group.getName())
+ && this.frequency.equals(group.frequency)
+ && this.datePattern
+ .equals(group.datePattern));
- }
+ }
- @Override
- public int hashCode() {
- return 127 * name.hashCode() + 31 * frequency.hashCode() + datePattern.hashCode();
- }
+ @Override
+ public int hashCode() {
+ return 127 * name.hashCode() + 31 * frequency.hashCode() + datePattern.hashCode();
+ }
- public String getName() {
- return name;
- }
+ public String getName() {
+ return name;
+ }
- public Frequency getFrequency() {
- return frequency;
- }
+ public Frequency getFrequency() {
+ return frequency;
+ }
- public String getDatePattern() {
- return datePattern;
- }
+ public String getDatePattern() {
+ return datePattern;
+ }
- public boolean canContainFeed(org.apache.falcon.entity.v0.feed.Feed feed) {
- if (this.frequency.equals(feed.getFrequency())
- && this.datePattern.equals(getDatePattern(FeedHelper.getLocation(feed, LocationType.DATA).getPath()))) {
- return true;
- }
- return false;
- }
+ public boolean canContainFeed(org.apache.falcon.entity.v0.feed.Feed feed) {
+ if (this.frequency.equals(feed.getFrequency())
+ && this.datePattern.equals(getDatePattern(FeedHelper.getLocation(feed, LocationType.DATA).getPath()))) {
+ return true;
+ }
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java b/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
index e154a14..532392f 100644
--- a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
+++ b/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
@@ -17,12 +17,6 @@
*/
package org.apache.falcon.group;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.FeedHelper;
@@ -33,88 +27,94 @@ import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.service.ConfigurationChangeListener;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
/**
* Has 2 way mappings from feed to group and group to feed
*/
public class FeedGroupMap implements ConfigurationChangeListener {
- private static final FeedGroupMap instance = new FeedGroupMap();
- private Map<String, FeedGroup> groupsMapping = new ConcurrentHashMap<String, FeedGroup>();
-
- private FeedGroupMap() {
- // singleton
- }
-
- public static FeedGroupMap get() {
- return instance;
- }
-
- public Map<String, FeedGroup> getGroupsMapping() {
- return Collections.unmodifiableMap(groupsMapping);
- }
-
- @Override
- public void onAdd(Entity entity) throws FalconException {
-
- if (entity.getEntityType().equals(EntityType.FEED)) {
- Feed feed = (Feed) entity;
- if (feed.getGroups() == null || feed.getGroups().equals("")) {
- return;
- }
- Set<FeedGroup> groupSet = getGroups(feed);
- addGroups(feed.getName(), groupSet);
- }
-
- }
-
- @Override
- public void onRemove(Entity entity) throws FalconException {
- if (entity.getEntityType().equals(EntityType.FEED)) {
- Feed feed = (Feed) entity;
- if (StringUtils.isEmpty(feed.getGroups())) {
- return;
- }
- String[] groups = feed.getGroups().split(",");
- for (String group : groups) {
- groupsMapping.get(group).getFeeds().remove(entity.getName());
- if (groupsMapping.get(group).getFeeds().size() == 0) {
- groupsMapping.remove(group);
- }
- }
-
- }
-
- }
-
- @Override
- public void onChange(Entity oldEntity, Entity newEntity)
- throws FalconException {
- onRemove(oldEntity);
- onAdd(newEntity);
- }
-
- private void addGroups(String feed, Set<FeedGroup> groups) {
- for (FeedGroup group : groups) {
- if (groupsMapping.containsKey(group.getName())) {
- groupsMapping.get(group.getName()).getFeeds().add(feed);
- } else {
- group.getFeeds().add(feed);
- groupsMapping.put(group.getName(), group);
- }
- }
- }
-
- public Set<FeedGroup> getGroups(String groups, Frequency frequency, String path) {
- Set<FeedGroup> groupSet = new HashSet<FeedGroup>();
- String[] groupArray = groups.split(",");
- for (String group : groupArray) {
- groupSet.add(new FeedGroup(group, frequency, path));
- }
- return groupSet;
- }
-
- public Set<FeedGroup> getGroups(org.apache.falcon.entity.v0.feed.Feed feed) {
- return getGroups(feed.getGroups(), feed.getFrequency(),
- FeedHelper.getLocation(feed, LocationType.DATA).getPath());
- }
+ private static final FeedGroupMap instance = new FeedGroupMap();
+ private Map<String, FeedGroup> groupsMapping = new ConcurrentHashMap<String, FeedGroup>();
+
+ private FeedGroupMap() {
+ // singleton
+ }
+
+ public static FeedGroupMap get() {
+ return instance;
+ }
+
+ public Map<String, FeedGroup> getGroupsMapping() {
+ return Collections.unmodifiableMap(groupsMapping);
+ }
+
+ @Override
+ public void onAdd(Entity entity) throws FalconException {
+
+ if (entity.getEntityType().equals(EntityType.FEED)) {
+ Feed feed = (Feed) entity;
+ if (feed.getGroups() == null || feed.getGroups().equals("")) {
+ return;
+ }
+ Set<FeedGroup> groupSet = getGroups(feed);
+ addGroups(feed.getName(), groupSet);
+ }
+
+ }
+
+ @Override
+ public void onRemove(Entity entity) throws FalconException {
+ if (entity.getEntityType().equals(EntityType.FEED)) {
+ Feed feed = (Feed) entity;
+ if (StringUtils.isEmpty(feed.getGroups())) {
+ return;
+ }
+ String[] groups = feed.getGroups().split(",");
+ for (String group : groups) {
+ groupsMapping.get(group).getFeeds().remove(entity.getName());
+ if (groupsMapping.get(group).getFeeds().size() == 0) {
+ groupsMapping.remove(group);
+ }
+ }
+
+ }
+
+ }
+
+ @Override
+ public void onChange(Entity oldEntity, Entity newEntity)
+ throws FalconException {
+ onRemove(oldEntity);
+ onAdd(newEntity);
+ }
+
+ private void addGroups(String feed, Set<FeedGroup> groups) {
+ for (FeedGroup group : groups) {
+ if (groupsMapping.containsKey(group.getName())) {
+ groupsMapping.get(group.getName()).getFeeds().add(feed);
+ } else {
+ group.getFeeds().add(feed);
+ groupsMapping.put(group.getName(), group);
+ }
+ }
+ }
+
+ public Set<FeedGroup> getGroups(String groups, Frequency frequency, String path) {
+ Set<FeedGroup> groupSet = new HashSet<FeedGroup>();
+ String[] groupArray = groups.split(",");
+ for (String group : groupArray) {
+ groupSet.add(new FeedGroup(group, frequency, path));
+ }
+ return groupSet;
+ }
+
+ public Set<FeedGroup> getGroups(org.apache.falcon.entity.v0.feed.Feed feed) {
+ return getGroups(feed.getGroups(), feed.getFrequency(),
+ FeedHelper.getLocation(feed, LocationType.DATA).getPath());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/security/CurrentUser.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/CurrentUser.java b/common/src/main/java/org/apache/falcon/security/CurrentUser.java
index 80c4de9..9a3086c 100644
--- a/common/src/main/java/org/apache/falcon/security/CurrentUser.java
+++ b/common/src/main/java/org/apache/falcon/security/CurrentUser.java
@@ -40,7 +40,9 @@ public final class CurrentUser {
throw new IllegalStateException
("Bad user name sent for authentication");
}
- if (user.equals(getUserInternal())) return;
+ if (user.equals(getUserInternal())) {
+ return;
+ }
Subject subject = new Subject();
subject.getPrincipals().add(new FalconPrincipal(user));
@@ -66,7 +68,7 @@ public final class CurrentUser {
if (subject == null) {
return null;
} else {
- for(FalconPrincipal principal: subject.
+ for (FalconPrincipal principal : subject.
getPrincipals(FalconPrincipal.class)) {
return principal.getName();
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/security/FalconLoginModule.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/FalconLoginModule.java b/common/src/main/java/org/apache/falcon/security/FalconLoginModule.java
index d6fa26d..a27a342 100644
--- a/common/src/main/java/org/apache/falcon/security/FalconLoginModule.java
+++ b/common/src/main/java/org/apache/falcon/security/FalconLoginModule.java
@@ -40,7 +40,7 @@ public class FalconLoginModule implements LoginModule {
}
private <T extends Principal> T getCanonicalUser(Class<T> cls) {
- for(T user: subject.getPrincipals(cls)) {
+ for (T user : subject.getPrincipals(cls)) {
return user;
}
return null;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/security/FalconSecurityConfiguration.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/security/FalconSecurityConfiguration.java b/common/src/main/java/org/apache/falcon/security/FalconSecurityConfiguration.java
index b88f27a..20ec8df 100644
--- a/common/src/main/java/org/apache/falcon/security/FalconSecurityConfiguration.java
+++ b/common/src/main/java/org/apache/falcon/security/FalconSecurityConfiguration.java
@@ -27,7 +27,7 @@ public class FalconSecurityConfiguration extends Configuration {
private static final AppConfigurationEntry OS_SPECIFIC_LOGIN =
new AppConfigurationEntry(SecurityConstants.OS_LOGIN_MODULE_NAME,
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
- new HashMap<String,String>());
+ new HashMap<String, String>());
private static final AppConfigurationEntry[] SIMPLE_CONF =
new AppConfigurationEntry[]{OS_SPECIFIC_LOGIN};
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/service/LogCleanupService.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/service/LogCleanupService.java b/common/src/main/java/org/apache/falcon/service/LogCleanupService.java
index c1fe265..17d5926 100644
--- a/common/src/main/java/org/apache/falcon/service/LogCleanupService.java
+++ b/common/src/main/java/org/apache/falcon/service/LogCleanupService.java
@@ -17,13 +17,6 @@
*/
package org.apache.falcon.service;
-import java.util.Date;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import javax.servlet.jsp.el.ELException;
-import javax.servlet.jsp.el.ExpressionEvaluator;
-
import org.apache.commons.el.ExpressionEvaluatorImpl;
import org.apache.falcon.FalconException;
import org.apache.falcon.aspect.GenericAlert;
@@ -34,58 +27,64 @@ import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.util.StartupProperties;
import org.apache.log4j.Logger;
+import javax.servlet.jsp.el.ELException;
+import javax.servlet.jsp.el.ExpressionEvaluator;
+import java.util.Date;
+import java.util.Timer;
+import java.util.TimerTask;
+
public class LogCleanupService implements FalconService {
- private static final Logger LOG = Logger.getLogger(LogCleanupService.class);
- private final ExpressionEvaluator EVALUATOR = new ExpressionEvaluatorImpl();
- private final ExpressionHelper resolver = ExpressionHelper.get();
+ private static final Logger LOG = Logger.getLogger(LogCleanupService.class);
+ private final ExpressionEvaluator EVALUATOR = new ExpressionEvaluatorImpl();
+ private final ExpressionHelper resolver = ExpressionHelper.get();
- @Override
- public String getName() {
- return "Falcon Log cleanup service";
- }
+ @Override
+ public String getName() {
+ return "Falcon Log cleanup service";
+ }
- @Override
- public void init() throws FalconException {
- Timer timer = new Timer();
- timer.schedule(new CleanupThread(), 0, getDelay());
- LOG.info("Falcon log cleanup service initialized");
+ @Override
+ public void init() throws FalconException {
+ Timer timer = new Timer();
+ timer.schedule(new CleanupThread(), 0, getDelay());
+ LOG.info("Falcon log cleanup service initialized");
- }
+ }
- private class CleanupThread extends TimerTask {
+ private class CleanupThread extends TimerTask {
- private AbstractCleanupHandler processCleanupHandler = new ProcessCleanupHandler();
- private AbstractCleanupHandler feedCleanupHandler = new FeedCleanupHandler();
+ private AbstractCleanupHandler processCleanupHandler = new ProcessCleanupHandler();
+ private AbstractCleanupHandler feedCleanupHandler = new FeedCleanupHandler();
- @Override
- public void run() {
- try {
- LOG.info("Cleaning up logs at: " + new Date());
- processCleanupHandler.cleanup();
- feedCleanupHandler.cleanup();
- } catch (Throwable t) {
- LOG.error("Error in cleanup task: ", t);
- GenericAlert.alertLogCleanupServiceFailed(
- "Exception in log cleanup service", t);
- }
- }
- }
+ @Override
+ public void run() {
+ try {
+ LOG.info("Cleaning up logs at: " + new Date());
+ processCleanupHandler.cleanup();
+ feedCleanupHandler.cleanup();
+ } catch (Throwable t) {
+ LOG.error("Error in cleanup task: ", t);
+ GenericAlert.alertLogCleanupServiceFailed(
+ "Exception in log cleanup service", t);
+ }
+ }
+ }
- @Override
- public void destroy() throws FalconException {
- LOG.info("Falcon log cleanup service destroyed");
- }
+ @Override
+ public void destroy() throws FalconException {
+ LOG.info("Falcon log cleanup service destroyed");
+ }
- private long getDelay() throws FalconException {
- String delay = StartupProperties.get().getProperty(
- "falcon.cleanup.service.frequency", "days(1)");
- try {
- return (Long) EVALUATOR.evaluate("${" + delay + "}", Long.class,
- resolver, resolver);
- } catch (ELException e) {
- throw new FalconException("Exception in EL evaluation", e);
- }
- }
+ private long getDelay() throws FalconException {
+ String delay = StartupProperties.get().getProperty(
+ "falcon.cleanup.service.frequency", "days(1)");
+ try {
+ return (Long) EVALUATOR.evaluate("${" + delay + "}", Long.class,
+ resolver, resolver);
+ } catch (ELException e) {
+ throw new FalconException("Exception in EL evaluation", e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/service/ServiceInitializer.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/service/ServiceInitializer.java b/common/src/main/java/org/apache/falcon/service/ServiceInitializer.java
index f44ad12..466cb81 100644
--- a/common/src/main/java/org/apache/falcon/service/ServiceInitializer.java
+++ b/common/src/main/java/org/apache/falcon/service/ServiceInitializer.java
@@ -33,13 +33,15 @@ public class ServiceInitializer {
getProperty("application.services", "org.apache.falcon.entity.store.ConfigurationStore");
for (String serviceClassName : serviceClassNames.split(",")) {
serviceClassName = serviceClassName.trim();
- if (serviceClassName.isEmpty()) continue;
+ if (serviceClassName.isEmpty()) {
+ continue;
+ }
FalconService service = ReflectionUtils.getInstanceByClassName(serviceClassName);
services.register(service);
LOG.info("Initializing service : " + serviceClassName);
try {
service.init();
- } catch(Throwable t) {
+ } catch (Throwable t) {
LOG.fatal("Failed to initialize service " + serviceClassName, t);
throw new FalconException(t);
}
@@ -52,7 +54,7 @@ public class ServiceInitializer {
LOG.info("Destroying service : " + service.getClass().getName());
try {
service.destroy();
- } catch(Throwable t) {
+ } catch (Throwable t) {
LOG.fatal("Failed to destroy service " + service.getClass().getName(), t);
throw new FalconException(t);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/service/Services.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/service/Services.java b/common/src/main/java/org/apache/falcon/service/Services.java
index dc41d5d..955e906 100644
--- a/common/src/main/java/org/apache/falcon/service/Services.java
+++ b/common/src/main/java/org/apache/falcon/service/Services.java
@@ -18,22 +18,23 @@
package org.apache.falcon.service;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.util.ReflectionUtils;
+import org.apache.log4j.Logger;
+
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.NoSuchElementException;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.util.ReflectionUtils;
-import org.apache.log4j.Logger;
-
public final class Services implements Iterable<FalconService> {
private static final Logger LOG = Logger.getLogger(Services.class);
private static Services instance = new Services();
- private Services() { }
+ private Services() {
+ }
public static Services get() {
return instance;
@@ -80,8 +81,8 @@ public final class Services implements Iterable<FalconService> {
register(service);
return service;
}
-
- public void reset(){
- services.clear();
+
+ public void reset() {
+ services.clear();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
index 7f36e12..4e199da 100644
--- a/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
+++ b/common/src/main/java/org/apache/falcon/update/UpdateHelper.java
@@ -18,9 +18,6 @@
package org.apache.falcon.update;
-import java.util.ArrayList;
-import java.util.List;
-
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
@@ -36,31 +33,40 @@ import org.apache.falcon.entity.v0.process.Inputs;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.log4j.Logger;
+import java.util.ArrayList;
+import java.util.List;
+
public final class UpdateHelper {
private static final Logger LOG = Logger.getLogger(UpdateHelper.class);
- private static final String[] FEED_FIELDS = new String[] { "partitions", "groups", "lateArrival.cutOff", "schema.location", "schema.provider",
- "ACL.group", "ACL.owner", "ACL.permission"};
- private static final String[] PROCESS_FIELDS = new String[] { "retry.policy", "retry.delay", "retry.attempts",
- "lateProcess.policy", "lateProcess.delay", "lateProcess.lateInputs[\\d+].input", "lateProcess.lateInputs[\\d+].workflowPath"};
-
+ private static final String[] FEED_FIELDS = new String[]{"partitions", "groups", "lateArrival.cutOff",
+ "schema.location", "schema.provider",
+ "ACL.group", "ACL.owner", "ACL.permission"};
+ private static final String[] PROCESS_FIELDS = new String[]{"retry.policy", "retry.delay", "retry.attempts",
+ "lateProcess.policy", "lateProcess.delay",
+ "lateProcess.lateInputs[\\d+].input",
+ "lateProcess.lateInputs[\\d+].workflowPath"};
+
public static boolean shouldUpdate(Entity oldEntity, Entity newEntity, String cluster) throws FalconException {
Entity oldView = EntityUtil.getClusterView(oldEntity, cluster);
Entity newView = EntityUtil.getClusterView(newEntity, cluster);
- switch(oldEntity.getEntityType()) {
+ switch (oldEntity.getEntityType()) {
case FEED:
- if(EntityUtil.equals(oldView, newView, FEED_FIELDS))
+ if (EntityUtil.equals(oldView, newView, FEED_FIELDS)) {
return false;
+ }
return true;
-
+
case PROCESS:
- if(EntityUtil.equals(oldView, newView, PROCESS_FIELDS))
+ if (EntityUtil.equals(oldView, newView, PROCESS_FIELDS)) {
return false;
+ }
return true;
}
throw new IllegalArgumentException("Unhandled entity type " + oldEntity.getEntityType());
}
- public static boolean shouldUpdate(Entity oldEntity, Entity newEntity, Entity affectedEntity) throws FalconException {
+ public static boolean shouldUpdate(Entity oldEntity, Entity newEntity, Entity affectedEntity)
+ throws FalconException {
if (oldEntity.getEntityType() == EntityType.FEED && affectedEntity.getEntityType() == EntityType.PROCESS) {
return shouldUpdate((Feed) oldEntity, (Feed) newEntity, (Process) affectedEntity);
} else {
@@ -71,31 +77,33 @@ public final class UpdateHelper {
}
public static boolean shouldUpdate(Feed oldFeed, Feed newFeed, Process affectedProcess) {
- if (!FeedHelper
- .getLocation(oldFeed.getLocations(), LocationType.DATA)
- .getPath()
- .equals(FeedHelper.getLocation(newFeed.getLocations(),
- LocationType.DATA).getPath())
- || !FeedHelper
- .getLocation(oldFeed.getLocations(), LocationType.META)
- .getPath()
- .equals(FeedHelper.getLocation(newFeed.getLocations(),
- LocationType.META).getPath())
- || !FeedHelper
- .getLocation(oldFeed.getLocations(), LocationType.STATS)
- .getPath()
- .equals(FeedHelper.getLocation(newFeed.getLocations(),
- LocationType.STATS).getPath())
- || !FeedHelper
- .getLocation(oldFeed.getLocations(), LocationType.TMP)
- .getPath()
- .equals(FeedHelper.getLocation(newFeed.getLocations(),
- LocationType.TMP).getPath()))
+ if (!FeedHelper
+ .getLocation(oldFeed.getLocations(), LocationType.DATA)
+ .getPath()
+ .equals(FeedHelper.getLocation(newFeed.getLocations(),
+ LocationType.DATA).getPath())
+ || !FeedHelper
+ .getLocation(oldFeed.getLocations(), LocationType.META)
+ .getPath()
+ .equals(FeedHelper.getLocation(newFeed.getLocations(),
+ LocationType.META).getPath())
+ || !FeedHelper
+ .getLocation(oldFeed.getLocations(), LocationType.STATS)
+ .getPath()
+ .equals(FeedHelper.getLocation(newFeed.getLocations(),
+ LocationType.STATS).getPath())
+ || !FeedHelper
+ .getLocation(oldFeed.getLocations(), LocationType.TMP)
+ .getPath()
+ .equals(FeedHelper.getLocation(newFeed.getLocations(),
+ LocationType.TMP).getPath())) {
return true;
+ }
LOG.debug(oldFeed.toShortString() + ": Location identical. Ignoring...");
- if (!oldFeed.getFrequency().equals(newFeed.getFrequency()))
+ if (!oldFeed.getFrequency().equals(newFeed.getFrequency())) {
return true;
+ }
LOG.debug(oldFeed.toShortString() + ": Frequency identical. Ignoring...");
// it is not possible to have oldFeed partitions as non empty and
@@ -118,39 +126,42 @@ public final class UpdateHelper {
if (newFeed.getPartitions() != null && oldFeed.getPartitions() != null) {
List<String> newParts = getPartitions(newFeed.getPartitions());
List<String> oldParts = getPartitions(oldFeed.getPartitions());
- if (newParts.size() != oldParts.size())
+ if (newParts.size() != oldParts.size()) {
return true;
- if (!newParts.containsAll(oldParts))
+ }
+ if (!newParts.containsAll(oldParts)) {
return true;
+ }
}
LOG.debug(oldFeed.toShortString() + ": Partitions identical. Ignoring...");
}
}
for (Cluster cluster : affectedProcess.getClusters().getClusters()) {
- if (!FeedHelper
- .getCluster(oldFeed, cluster.getName())
- .getValidity()
- .getStart()
- .equals(FeedHelper.getCluster(newFeed, cluster.getName())
- .getValidity().getStart())
- || !FeedHelper.getLocation(oldFeed, LocationType.DATA,
- cluster.getName()).getPath().equals(
- FeedHelper.getLocation(newFeed, LocationType.DATA,
- cluster.getName()).getPath())
- || !FeedHelper.getLocation(oldFeed, LocationType.META,
- cluster.getName()).getPath().equals(
- FeedHelper.getLocation(newFeed, LocationType.META,
- cluster.getName()).getPath())
- || !FeedHelper.getLocation(oldFeed, LocationType.STATS,
- cluster.getName()).getPath().equals(
- FeedHelper.getLocation(newFeed, LocationType.STATS,
- cluster.getName()).getPath())
- || !FeedHelper.getLocation(oldFeed, LocationType.TMP,
- cluster.getName()).getPath().equals(
- FeedHelper.getLocation(newFeed, LocationType.TMP,
- cluster.getName()).getPath()))
- return true;
+ if (!FeedHelper
+ .getCluster(oldFeed, cluster.getName())
+ .getValidity()
+ .getStart()
+ .equals(FeedHelper.getCluster(newFeed, cluster.getName())
+ .getValidity().getStart())
+ || !FeedHelper.getLocation(oldFeed, LocationType.DATA,
+ cluster.getName()).getPath().equals(
+ FeedHelper.getLocation(newFeed, LocationType.DATA,
+ cluster.getName()).getPath())
+ || !FeedHelper.getLocation(oldFeed, LocationType.META,
+ cluster.getName()).getPath().equals(
+ FeedHelper.getLocation(newFeed, LocationType.META,
+ cluster.getName()).getPath())
+ || !FeedHelper.getLocation(oldFeed, LocationType.STATS,
+ cluster.getName()).getPath().equals(
+ FeedHelper.getLocation(newFeed, LocationType.STATS,
+ cluster.getName()).getPath())
+ || !FeedHelper.getLocation(oldFeed, LocationType.TMP,
+ cluster.getName()).getPath().equals(
+ FeedHelper.getLocation(newFeed, LocationType.TMP,
+ cluster.getName()).getPath())) {
+ return true;
+ }
LOG.debug(oldFeed.toShortString() + ": Feed on cluster" + cluster.getName() + " identical. Ignoring...");
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/util/ApplicationProperties.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/ApplicationProperties.java b/common/src/main/java/org/apache/falcon/util/ApplicationProperties.java
index f618876..f5dbc83 100644
--- a/common/src/main/java/org/apache/falcon/util/ApplicationProperties.java
+++ b/common/src/main/java/org/apache/falcon/util/ApplicationProperties.java
@@ -18,6 +18,10 @@
package org.apache.falcon.util;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.expression.ExpressionHelper;
+import org.apache.log4j.Logger;
+
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
@@ -26,10 +30,6 @@ import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.expression.ExpressionHelper;
-import org.apache.log4j.Logger;
-
public abstract class ApplicationProperties extends Properties {
private static Logger LOG = Logger.getLogger(ApplicationProperties.class);
@@ -52,7 +52,7 @@ public abstract class ApplicationProperties extends Properties {
public String getDomain() {
return domain;
}
-
+
protected void initialize() {
String propFile = getPropertyFile();
String userHome = System.getProperty("user.home");
@@ -68,7 +68,8 @@ public abstract class ApplicationProperties extends Properties {
location = LocationType.FILE;
propertyFile = new File(confDir, propFile).getAbsolutePath();
} else {
- LOG.info("config.location is not set, properties file not present in " + "user home dir, falling back to classpath for "
+ LOG.info("config.location is not set, properties file not present in "
+ + "user home dir, falling back to classpath for "
+ propFile);
location = LocationType.CLASSPATH;
propertyFile = propFile;
@@ -99,15 +100,16 @@ public abstract class ApplicationProperties extends Properties {
LOG.info("Loading properties from " + propertyFile);
Properties origProps = new Properties();
origProps.load(resource);
- if(domain == null) {
+ if (domain == null) {
domain = origProps.getProperty("*.domain");
- if(domain == null)
+ if (domain == null) {
throw new FalconException("Domain is not set!");
+ }
}
LOG.info("Initializing properties with domain " + domain);
-
+
Set<String> keys = getKeys(origProps.keySet());
- for(String key:keys) {
+ for (String key : keys) {
String value = origProps.getProperty(domain + "." + key, origProps.getProperty("*." + key));
value = ExpressionHelper.substitute(value);
LOG.debug(key + "=" + value);
@@ -124,7 +126,7 @@ public abstract class ApplicationProperties extends Properties {
private Set<String> getKeys(Set<Object> keySet) {
Set<String> keys = new HashSet<String>();
- for(Object keyObj:keySet) {
+ for (Object keyObj : keySet) {
String key = (String) keyObj;
keys.add(key.substring(key.indexOf('.') + 1));
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/util/BuildProperties.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/BuildProperties.java b/common/src/main/java/org/apache/falcon/util/BuildProperties.java
index e91a647..898daee 100644
--- a/common/src/main/java/org/apache/falcon/util/BuildProperties.java
+++ b/common/src/main/java/org/apache/falcon/util/BuildProperties.java
@@ -27,26 +27,26 @@ public class BuildProperties extends ApplicationProperties {
private static final String PROPERTY_FILE = "falcon-buildinfo.properties";
private static final AtomicReference<BuildProperties> instance =
- new AtomicReference<BuildProperties>();
+ new AtomicReference<BuildProperties>();
private BuildProperties() throws FalconException {
- super();
+ super();
}
@Override
protected String getPropertyFile() {
- return PROPERTY_FILE;
+ return PROPERTY_FILE;
}
public static Properties get() {
- try {
- if (instance.get() == null) {
- instance.compareAndSet(null, new BuildProperties());
+ try {
+ if (instance.get() == null) {
+ instance.compareAndSet(null, new BuildProperties());
+ }
+ return instance.get();
+ } catch (FalconException e) {
+ throw new RuntimeException("Unable to read application " +
+ "falcon build information properties", e);
}
- return instance.get();
- } catch (FalconException e) {
- throw new RuntimeException("Unable to read application " +
- "falcon build information properties", e);
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java b/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java
index d13b817..4e2f7db 100644
--- a/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java
+++ b/common/src/main/java/org/apache/falcon/util/DeploymentProperties.java
@@ -27,26 +27,26 @@ public class DeploymentProperties extends ApplicationProperties {
private static final String PROPERTY_FILE = "deploy.properties";
private static final AtomicReference<DeploymentProperties> instance =
- new AtomicReference<DeploymentProperties>();
+ new AtomicReference<DeploymentProperties>();
private DeploymentProperties() throws FalconException {
- super();
+ super();
}
@Override
protected String getPropertyFile() {
- return PROPERTY_FILE;
+ return PROPERTY_FILE;
}
public static Properties get() {
- try {
- if (instance.get() == null) {
- instance.compareAndSet(null, new DeploymentProperties());
+ try {
+ if (instance.get() == null) {
+ instance.compareAndSet(null, new DeploymentProperties());
+ }
+ return instance.get();
+ } catch (FalconException e) {
+ throw new RuntimeException("Unable to read application " +
+ "startup properties", e);
}
- return instance.get();
- } catch (FalconException e) {
- throw new RuntimeException("Unable to read application " +
- "startup properties", e);
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java b/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java
index 86acb81..9aeb3ab 100644
--- a/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java
+++ b/common/src/main/java/org/apache/falcon/util/DeploymentUtil.java
@@ -18,12 +18,12 @@
package org.apache.falcon.util;
-import java.util.HashSet;
-import java.util.Set;
-
import org.apache.falcon.entity.ColoClusterRelation;
import org.apache.log4j.Logger;
+import java.util.HashSet;
+import java.util.Set;
+
public class DeploymentUtil {
private static final Logger LOG = Logger.getLogger(DeploymentUtil.class);
@@ -35,7 +35,7 @@ public class DeploymentUtil {
protected final static String currentColo;
protected final static boolean embeddedMode;
protected static boolean prism = false;
-
+
static {
DEFAULT_ALL_COLOS.add(DEFAULT_COLO);
embeddedMode = DeploymentProperties.get().
@@ -44,33 +44,33 @@ public class DeploymentUtil {
currentColo = DEFAULT_COLO;
} else {
currentColo = StartupProperties.get().
- getProperty("current.colo", DEFAULT_COLO);
+ getProperty("current.colo", DEFAULT_COLO);
}
LOG.info("Running in embedded mode? " + embeddedMode);
LOG.info("Current colo: " + currentColo);
}
-
+
public static void setPrismMode() {
- prism = true;
+ prism = true;
}
-
+
public static boolean isPrism() {
- return !embeddedMode && prism;
+ return !embeddedMode && prism;
}
-
+
public static String getCurrentColo() {
return currentColo;
}
-
+
public static Set<String> getCurrentClusters() {
String colo = getCurrentColo();
return ColoClusterRelation.get().getClusters(colo);
}
-
+
public static boolean isEmbeddedMode() {
return embeddedMode;
}
-
+
public static String getDefaultColo() {
return DEFAULT_COLO;
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java b/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java
index 8f255fd..d1bed8e 100644
--- a/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java
+++ b/common/src/main/java/org/apache/falcon/util/ReflectionUtils.java
@@ -18,10 +18,10 @@
package org.apache.falcon.util;
-import java.lang.reflect.Method;
-
import org.apache.falcon.FalconException;
+import java.lang.reflect.Method;
+
public final class ReflectionUtils {
public static <T> T getInstance(String classKey) throws FalconException {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/util/RuntimeProperties.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/RuntimeProperties.java b/common/src/main/java/org/apache/falcon/util/RuntimeProperties.java
index 6ecf33f..86a54f8 100644
--- a/common/src/main/java/org/apache/falcon/util/RuntimeProperties.java
+++ b/common/src/main/java/org/apache/falcon/util/RuntimeProperties.java
@@ -18,72 +18,72 @@
package org.apache.falcon.util;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
-
import org.apache.falcon.FalconException;
import org.apache.log4j.Logger;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
public class RuntimeProperties extends ApplicationProperties {
- private static Logger LOG = Logger.getLogger(RuntimeProperties.class);
+ private static Logger LOG = Logger.getLogger(RuntimeProperties.class);
- private static final String PROPERTY_FILE = "runtime.properties";
+ private static final String PROPERTY_FILE = "runtime.properties";
- private static final AtomicReference<RuntimeProperties> instance =
- new AtomicReference<RuntimeProperties>();
+ private static final AtomicReference<RuntimeProperties> instance =
+ new AtomicReference<RuntimeProperties>();
- private RuntimeProperties() throws FalconException {
- super();
- Thread refreshThread = new Thread(new DynamicLoader(this));
- refreshThread.start();
- }
+ private RuntimeProperties() throws FalconException {
+ super();
+ Thread refreshThread = new Thread(new DynamicLoader(this));
+ refreshThread.start();
+ }
- @Override
- protected String getPropertyFile() {
- return PROPERTY_FILE;
- }
+ @Override
+ protected String getPropertyFile() {
+ return PROPERTY_FILE;
+ }
- public static Properties get() {
- try {
- if (instance.get() == null) {
- instance.compareAndSet(null, new RuntimeProperties());
- }
- return instance.get();
- } catch (FalconException e) {
- throw new RuntimeException("Unable to read application " +
- "runtime properties", e);
+ public static Properties get() {
+ try {
+ if (instance.get() == null) {
+ instance.compareAndSet(null, new RuntimeProperties());
+ }
+ return instance.get();
+ } catch (FalconException e) {
+ throw new RuntimeException("Unable to read application " +
+ "runtime properties", e);
+ }
}
- }
- private class DynamicLoader implements Runnable {
+ private class DynamicLoader implements Runnable {
- private static final long REFRESH_DELAY = 300000L;
- private static final int MAX_ITER = 20; //1hr
- private final ApplicationProperties applicationProperties;
+ private static final long REFRESH_DELAY = 300000L;
+ private static final int MAX_ITER = 20; //1hr
+ private final ApplicationProperties applicationProperties;
- private DynamicLoader(ApplicationProperties applicationProperties) {
- this.applicationProperties = applicationProperties;
- }
+ private DynamicLoader(ApplicationProperties applicationProperties) {
+ this.applicationProperties = applicationProperties;
+ }
- @Override
- public void run() {
- long backOffDelay = REFRESH_DELAY;
- while (true) {
- try {
- try {
- applicationProperties.loadProperties();
- backOffDelay = REFRESH_DELAY;
- } catch (FalconException e) {
- LOG.warn("Error refreshing runtime properties", e);
- backOffDelay += REFRESH_DELAY;
- }
- Thread.sleep(Math.min(MAX_ITER * REFRESH_DELAY, backOffDelay));
- } catch (InterruptedException e) {
- LOG.info("Application is stopping. Aborting...");
- break;
+ @Override
+ public void run() {
+ long backOffDelay = REFRESH_DELAY;
+ while (true) {
+ try {
+ try {
+ applicationProperties.loadProperties();
+ backOffDelay = REFRESH_DELAY;
+ } catch (FalconException e) {
+ LOG.warn("Error refreshing runtime properties", e);
+ backOffDelay += REFRESH_DELAY;
+ }
+ Thread.sleep(Math.min(MAX_ITER * REFRESH_DELAY, backOffDelay));
+ } catch (InterruptedException e) {
+ LOG.info("Application is stopping. Aborting...");
+ break;
+ }
+ }
}
- }
}
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/util/StartupProperties.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/util/StartupProperties.java b/common/src/main/java/org/apache/falcon/util/StartupProperties.java
index e264758..4a19df4 100644
--- a/common/src/main/java/org/apache/falcon/util/StartupProperties.java
+++ b/common/src/main/java/org/apache/falcon/util/StartupProperties.java
@@ -18,36 +18,36 @@
package org.apache.falcon.util;
+import org.apache.falcon.FalconException;
+
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.falcon.FalconException;
-
public class StartupProperties extends ApplicationProperties {
- private static final String PROPERTY_FILE = "startup.properties";
-
- private static final AtomicReference<StartupProperties> instance =
- new AtomicReference<StartupProperties>();
-
- private StartupProperties() throws FalconException {
- super();
- }
-
- @Override
- protected String getPropertyFile() {
- return PROPERTY_FILE;
- }
-
- public static Properties get() {
- try {
- if (instance.get() == null) {
- instance.compareAndSet(null, new StartupProperties());
- }
- return instance.get();
- } catch (FalconException e) {
- throw new RuntimeException("Unable to read application " +
- "startup properties", e);
+ private static final String PROPERTY_FILE = "startup.properties";
+
+ private static final AtomicReference<StartupProperties> instance =
+ new AtomicReference<StartupProperties>();
+
+ private StartupProperties() throws FalconException {
+ super();
+ }
+
+ @Override
+ protected String getPropertyFile() {
+ return PROPERTY_FILE;
+ }
+
+ public static Properties get() {
+ try {
+ if (instance.get() == null) {
+ instance.compareAndSet(null, new StartupProperties());
+ }
+ return instance.get();
+ } catch (FalconException e) {
+ throw new RuntimeException("Unable to read application " +
+ "startup properties", e);
+ }
}
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java
index 9b75327..76a9edc 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowBuilder.java
@@ -18,15 +18,15 @@
package org.apache.falcon.workflow;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.util.ReflectionUtils;
+
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import org.apache.falcon.FalconException;
-import org.apache.falcon.entity.v0.Entity;
-import org.apache.falcon.util.ReflectionUtils;
-
public abstract class WorkflowBuilder<T extends Entity> {
public static WorkflowBuilder<Entity> getBuilder(String engine, Entity entity) throws FalconException {
@@ -35,8 +35,9 @@ public abstract class WorkflowBuilder<T extends Entity> {
}
public abstract Map<String, Properties> newWorkflowSchedule(T entity, List<String> clusters) throws FalconException;
-
- public abstract Properties newWorkflowSchedule(T entity, Date startDate, String clusterName, String user) throws FalconException;
+
+ public abstract Properties newWorkflowSchedule(T entity, Date startDate, String clusterName, String user)
+ throws FalconException;
public abstract String[] getWorkflowNames(T entity);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java
index 5880f68..a267e39 100644
--- a/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java
+++ b/common/src/main/java/org/apache/falcon/workflow/WorkflowEngineFactory.java
@@ -27,11 +27,12 @@ public class WorkflowEngineFactory {
private static final String WORKFLOW_ENGINE = "workflow.engine.impl";
- private WorkflowEngineFactory() { }
+ private WorkflowEngineFactory() {
+ }
- public static AbstractWorkflowEngine getWorkflowEngine()
+ public static AbstractWorkflowEngine getWorkflowEngine()
throws FalconException {
return ReflectionUtils.getInstance(WORKFLOW_ENGINE);
- }
+ }
}