You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by aj...@apache.org on 2015/08/07 16:54:07 UTC

falcon git commit: FALCON-1368 Improve Falcon server restart time. Contributed by Sandeep Samudrala.

Repository: falcon
Updated Branches:
  refs/heads/master fccfc1c63 -> 5c9fe56b2


FALCON-1368 Improve Falcon server restart time. Contributed by Sandeep Samudrala.


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/5c9fe56b
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/5c9fe56b
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/5c9fe56b

Branch: refs/heads/master
Commit: 5c9fe56b2256b76368add5d8700bcd2b203bad40
Parents: fccfc1c
Author: Ajay Yadava <aj...@gmail.com>
Authored: Fri Aug 7 19:42:35 2015 +0530
Committer: Ajay Yadava <aj...@gmail.com>
Committed: Fri Aug 7 20:03:58 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../falcon/entity/store/ConfigurationStore.java | 64 +++++++++++++++-----
 2 files changed, 50 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/5c9fe56b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index be06b6f..2c16bd8 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,8 @@ Trunk (Unreleased)
     FALCON-796 Enable users to triage data processing issues through falcon (Ajay Yadava)
     
   IMPROVEMENTS
+    FALCON-1368 Improve Falcon server restart time(Sandeep Samudrala via Ajay Yadava)
+
     FALCON-1361 Default end date should be now(Pragya Mittal via Ajay Yadava)
 
     FALCON-1362 Colo option shouldn't be mandatory(Sandeep Samudrala via Ajay Yadava)

http://git-wip-us.apache.org/repos/asf/falcon/blob/5c9fe56b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
index 7b53ebb..e27187b 100644
--- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
+++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
@@ -49,6 +49,9 @@ import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Persistent store for falcon entities.
@@ -145,25 +148,54 @@ public final class ConfigurationStore implements FalconService {
         }
 
         if (shouldPersist) {
-            try {
-                for (EntityType type : ENTITY_LOAD_ORDER) {
-                    ConcurrentHashMap<String, Entity> entityMap = dictionary.get(type);
-                    FileStatus[] files = fs.globStatus(new Path(storePath, type.name() + Path.SEPARATOR + "*"));
-                    if (files != null) {
-                        for (FileStatus file : files) {
-                            String fileName = file.getPath().getName();
-                            String encodedEntityName = fileName.substring(0, fileName.length() - 4); // drop
-                            // ".xml"
-                            String entityName = URLDecoder.decode(encodedEntityName, UTF_8);
-                            Entity entity = restore(type, entityName);
-                            entityMap.put(entityName, entity);
-                            onReload(entity);
+            for (final EntityType type : ENTITY_LOAD_ORDER) {
+                loadEntity(type);
+            }
+        }
+    }
+
+    private void loadEntity(final EntityType type) throws FalconException {
+        try {
+            final ConcurrentHashMap<String, Entity> entityMap = dictionary.get(type);
+            FileStatus[] files = fs.globStatus(new Path(storePath, type.name() + Path.SEPARATOR + "*"));
+            if (files != null) {
+                final ExecutorService service = Executors.newFixedThreadPool(100);
+                for (final FileStatus file : files) {
+                    service.execute(new Runnable() {
+                        @Override
+                        public void run() {
+                            try {
+                                String fileName = file.getPath().getName();
+                                String encodedEntityName = fileName.substring(0, fileName.length() - 4); // drop
+                                // ".xml"
+                                String entityName = URLDecoder.decode(encodedEntityName, UTF_8);
+                                Entity entity = restore(type, entityName);
+                                entityMap.put(entityName, entity);
+                            } catch (IOException | FalconException e) {
+                                LOG.error("Unable to restore entity of", file);
+                            }
                         }
-                    }
+                    });
+                }
+                service.shutdown();
+                if (service.awaitTermination(10, TimeUnit.MINUTES)) {
+                    LOG.info("Restored Configurations for entity type: {} ", type.name());
+                } else {
+                    LOG.warn("Time out happened while waiting for all threads to finish while restoring entities "
+                            + "for type: {}", type.name());
+                }
+                // Checking if all entities were loaded
+                if (entityMap.size() != files.length) {
+                    throw new FalconException("Unable to restore configurations for entity type " + type.name());
+                }
+                for (Entity entity : entityMap.values()){
+                    onReload(entity);
                 }
-            } catch (IOException e) {
-                throw new FalconException("Unable to restore configurations", e);
             }
+        } catch (IOException e) {
+            throw new FalconException("Unable to restore configurations", e);
+        } catch (InterruptedException e) {
+            throw new FalconException("Failed to restore configurations in 10 minutes for entity type " + type.name());
         }
     }