You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/08/07 16:54:07 UTC
falcon git commit: FALCON-1368 Improve Falcon server restart time.
Contributed by Sandeep Samudrala.
Repository: falcon
Updated Branches:
refs/heads/master fccfc1c63 -> 5c9fe56b2
FALCON-1368 Improve Falcon server restart time. Contributed by Sandeep Samudrala.
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/5c9fe56b
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/5c9fe56b
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/5c9fe56b
Branch: refs/heads/master
Commit: 5c9fe56b2256b76368add5d8700bcd2b203bad40
Parents: fccfc1c
Author: Ajay Yadava <aj...@gmail.com>
Authored: Fri Aug 7 19:42:35 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Fri Aug 7 20:03:58 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../falcon/entity/store/ConfigurationStore.java | 64 +++++++++++++++-----
2 files changed, 50 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/5c9fe56b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index be06b6f..2c16bd8 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,8 @@ Trunk (Unreleased)
FALCON-796 Enable users to triage data processing issues through falcon (Ajay Yadava)
IMPROVEMENTS
+ FALCON-1368 Improve Falcon server restart time(Sandeep Samudrala via Ajay Yadava)
+
FALCON-1361 Default end date should be now(Pragya Mittal via Ajay Yadava)
FALCON-1362 Colo option shouldn't be mandatory(Sandeep Samudrala via Ajay Yadava)
http://git-wip-us.apache.org/repos/asf/falcon/blob/5c9fe56b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
index 7b53ebb..e27187b 100644
--- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
+++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
@@ -49,6 +49,9 @@ import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
/**
* Persistent store for falcon entities.
@@ -145,25 +148,54 @@ public final class ConfigurationStore implements FalconService {
}
if (shouldPersist) {
- try {
- for (EntityType type : ENTITY_LOAD_ORDER) {
- ConcurrentHashMap<String, Entity> entityMap = dictionary.get(type);
- FileStatus[] files = fs.globStatus(new Path(storePath, type.name() + Path.SEPARATOR + "*"));
- if (files != null) {
- for (FileStatus file : files) {
- String fileName = file.getPath().getName();
- String encodedEntityName = fileName.substring(0, fileName.length() - 4); // drop
- // ".xml"
- String entityName = URLDecoder.decode(encodedEntityName, UTF_8);
- Entity entity = restore(type, entityName);
- entityMap.put(entityName, entity);
- onReload(entity);
+ for (final EntityType type : ENTITY_LOAD_ORDER) {
+ loadEntity(type);
+ }
+ }
+ }
+
+ private void loadEntity(final EntityType type) throws FalconException {
+ try {
+ final ConcurrentHashMap<String, Entity> entityMap = dictionary.get(type);
+ FileStatus[] files = fs.globStatus(new Path(storePath, type.name() + Path.SEPARATOR + "*"));
+ if (files != null) {
+ final ExecutorService service = Executors.newFixedThreadPool(100);
+ for (final FileStatus file : files) {
+ service.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ String fileName = file.getPath().getName();
+ String encodedEntityName = fileName.substring(0, fileName.length() - 4); // drop
+ // ".xml"
+ String entityName = URLDecoder.decode(encodedEntityName, UTF_8);
+ Entity entity = restore(type, entityName);
+ entityMap.put(entityName, entity);
+ } catch (IOException | FalconException e) {
+ LOG.error("Unable to restore entity of", file);
+ }
}
- }
+ });
+ }
+ service.shutdown();
+ if (service.awaitTermination(10, TimeUnit.MINUTES)) {
+ LOG.info("Restored Configurations for entity type: {} ", type.name());
+ } else {
+ LOG.warn("Time out happened while waiting for all threads to finish while restoring entities "
+ + "for type: {}", type.name());
+ }
+ // Checking if all entities were loaded
+ if (entityMap.size() != files.length) {
+ throw new FalconException("Unable to restore configurations for entity type " + type.name());
+ }
+ for (Entity entity : entityMap.values()){
+ onReload(entity);
}
- } catch (IOException e) {
- throw new FalconException("Unable to restore configurations", e);
}
+ } catch (IOException e) {
+ throw new FalconException("Unable to restore configurations", e);
+ } catch (InterruptedException e) {
+ throw new FalconException("Failed to restore configurations in 10 minutes for entity type " + type.name());
}
}