You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2013/09/24 01:04:54 UTC

[6/8] git commit: FALCON-69 Exception from ConfigurationChangeListener should fail the API. Contributed by Shwetha GS

FALCON-69 Exception from ConfigurationChangeListener should fail the API. Contributed by Shwetha GS


Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/47111568
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/47111568
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/47111568

Branch: refs/heads/FALCON-85
Commit: 471115687b8598eafea996f734aa5bb07f7981c4
Parents: 5e8238b
Author: srikanth.sundarrajan <sr...@inmobi.com>
Authored: Mon Sep 23 22:01:30 2013 +0530
Committer: srikanth.sundarrajan <sr...@inmobi.com>
Committed: Mon Sep 23 22:01:30 2013 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  3 ++
 .../falcon/entity/ColoClusterRelation.java      |  2 +-
 .../falcon/entity/store/ConfigurationStore.java | 49 +++++++-------------
 .../apache/falcon/entity/v0/EntityGraph.java    |  4 +-
 .../org/apache/falcon/group/FeedGroupMap.java   |  4 +-
 .../service/ConfigurationChangeListener.java    |  2 +-
 .../entity/store/ConfigurationStoreTest.java    | 44 ++++++++++++++++--
 .../service/SharedLibraryHostingService.java    | 11 ++++-
 8 files changed, 74 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/47111568/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 60a0afb..7ddb0e5 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -11,6 +11,9 @@ Trunk (Unreleased)
     via Srikanth Sundarrajan)
 
   IMPROVEMENTS
+    FALCON-69 exception from ConfigurationChangeListener should fail 
+    the API. (Shwetha GS via Srikanth Sundarrajan)
+
     FALCON-63 Ability to ingest hadoop libs to falcon release package.
     (Suhas Vasu via Shwetha GS)
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/47111568/common/src/main/java/org/apache/falcon/entity/ColoClusterRelation.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/ColoClusterRelation.java b/common/src/main/java/org/apache/falcon/entity/ColoClusterRelation.java
index 72dd952..e42efd7 100644
--- a/common/src/main/java/org/apache/falcon/entity/ColoClusterRelation.java
+++ b/common/src/main/java/org/apache/falcon/entity/ColoClusterRelation.java
@@ -51,7 +51,7 @@ public final class ColoClusterRelation implements ConfigurationChangeListener {
     }
 
     @Override
-    public void onAdd(Entity entity) {
+    public void onAdd(Entity entity, boolean ignoreFailure) {
         if (entity.getEntityType() != EntityType.CLUSTER) {
             return;
         }

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/47111568/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
index 4897c25..18ceb6e 100644
--- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
+++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java
@@ -116,7 +116,7 @@ public final class ConfigurationStore implements FalconService {
                         String entityName = URLDecoder.decode(encodedEntityName, UTF_8);
                         Entity entity = restore(type, entityName);
                         entityMap.put(entityName, entity);
-                        onAdd(entity);
+                        onAdd(entity, true);
                     }
                 }
             }
@@ -129,6 +129,10 @@ public final class ConfigurationStore implements FalconService {
         listeners.add(listener);
     }
 
+    public void unregisterListener(ConfigurationChangeListener listener) {
+        listeners.remove(listener);
+    }
+
     /**
      * @param type   - EntityType that need to be published
      * @param entity - Reference to the Entity Object
@@ -138,8 +142,8 @@ public final class ConfigurationStore implements FalconService {
         try {
             if (get(type, entity.getName()) == null) {
                 persist(type, entity);
+                onAdd(entity, false);
                 dictionary.get(type).put(entity.getName(), entity);
-                onAdd(entity);
             } else {
                 throw new EntityAlreadyExistsException(
                         entity.toShortString() + " already registered with configuration store. "
@@ -155,8 +159,10 @@ public final class ConfigurationStore implements FalconService {
         try {
             if (get(type, entity.getName()) != null) {
                 persist(type, entity);
-                Entity oldEntity = dictionary.get(type).put(entity.getName(), entity);
+                ConcurrentHashMap<String, Entity> entityMap = dictionary.get(type);
+                Entity oldEntity = entityMap.get(entity.getName());
                 onChange(oldEntity, entity);
+                entityMap.put(entity.getName(), entity);
             } else {
                 throw new FalconException(entity.toShortString() + " doesn't exist");
             }
@@ -166,10 +172,6 @@ public final class ConfigurationStore implements FalconService {
         AUDIT.info(type + "/" + entity.getName() + " is replaced into config store");
     }
 
-    public synchronized void rollbackUpdate(EntityType type, Entity entity) throws FalconException {
-        updateInternal(type, entity);
-    }
-
     public synchronized void update(EntityType type, Entity entity) throws FalconException {
         if (updatesInProgress.get() == entity) {
             updateInternal(type, entity);
@@ -178,13 +180,9 @@ public final class ConfigurationStore implements FalconService {
         }
     }
 
-    private void onAdd(Entity entity) {
+    private void onAdd(Entity entity, boolean ignoreFailure) throws FalconException {
         for (ConfigurationChangeListener listener : listeners) {
-            try {
-                listener.onAdd(entity);
-            } catch (Throwable e) {
-                LOG.warn("Encountered exception while notifying " + listener + entity.toShortString(), e);
-            }
+            listener.onAdd(entity, ignoreFailure);
         }
     }
 
@@ -255,7 +253,9 @@ public final class ConfigurationStore implements FalconService {
         if (entityMap.containsKey(name)) {
             try {
                 archive(type, name);
-                onRemove(entityMap.remove(name));
+                Entity entity = entityMap.get(name);
+                onRemove(entity);
+                entityMap.remove(name);
             } catch (IOException e) {
                 throw new StoreAccessException(e);
             }
@@ -265,30 +265,13 @@ public final class ConfigurationStore implements FalconService {
         return false;
     }
 
-    private void onRemove(Entity entity) {
+    private void onRemove(Entity entity) throws FalconException {
         for (ConfigurationChangeListener listener : listeners) {
-            try {
-                listener.onRemove(entity);
-            } catch (Throwable e) {
-                LOG.warn(
-                        "Encountered exception while notifying " + listener + "(" + entity.getEntityType() + ") "
-                                + entity.getName(),
-                        e);
-            }
+            listener.onRemove(entity);
         }
     }
 
     /**
-     * @param type     - Entity type that needs to be searched
-     * @param keywords - List of keywords to search for. only entities that have all
-     *                 the keywords being searched would be returned
-     * @return - Array of entity types
-     */
-    public Entity[] search(EntityType type, String... keywords) {
-        return null;
-    }
-
-    /**
      * @param type   - Entity type that is to be stored into persistent storage
      * @param entity - entity to persist. JAXB Annotated entity will be marshalled
      *               to the persistent store. The convention used for storing the

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/47111568/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java b/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
index 803fa9e..fff124b 100644
--- a/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
+++ b/common/src/main/java/org/apache/falcon/entity/v0/EntityGraph.java
@@ -69,7 +69,7 @@ public final class EntityGraph implements ConfigurationChangeListener {
     }
 
     @Override
-    public void onAdd(Entity entity) throws FalconException {
+    public void onAdd(Entity entity, boolean ignoreFailure) throws FalconException {
         Map<Node, Set<Node>> nodeEdges = null;
         switch (entity.getEntityType()) {
         case PROCESS:
@@ -124,7 +124,7 @@ public final class EntityGraph implements ConfigurationChangeListener {
     @Override
     public void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
         onRemove(oldEntity);
-        onAdd(newEntity);
+        onAdd(newEntity, false);
     }
 
     private Map<Node, Set<Node>> getEdgesFor(Process process) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/47111568/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java b/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
index ed44b48..f0d2e0b 100644
--- a/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
+++ b/common/src/main/java/org/apache/falcon/group/FeedGroupMap.java
@@ -54,7 +54,7 @@ public final class FeedGroupMap implements ConfigurationChangeListener {
     }
 
     @Override
-    public void onAdd(Entity entity) throws FalconException {
+    public void onAdd(Entity entity, boolean ignoreFailure) throws FalconException {
 
         if (entity.getEntityType().equals(EntityType.FEED)) {
             Feed feed = (Feed) entity;
@@ -91,7 +91,7 @@ public final class FeedGroupMap implements ConfigurationChangeListener {
         throws FalconException {
 
         onRemove(oldEntity);
-        onAdd(newEntity);
+        onAdd(newEntity, false);
     }
 
     private void addGroups(String feed, Set<FeedGroup> groups) {

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/47111568/common/src/main/java/org/apache/falcon/service/ConfigurationChangeListener.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/service/ConfigurationChangeListener.java b/common/src/main/java/org/apache/falcon/service/ConfigurationChangeListener.java
index 56953ad..da790fb 100644
--- a/common/src/main/java/org/apache/falcon/service/ConfigurationChangeListener.java
+++ b/common/src/main/java/org/apache/falcon/service/ConfigurationChangeListener.java
@@ -26,7 +26,7 @@ import org.apache.falcon.entity.v0.Entity;
  */
 public interface ConfigurationChangeListener {
 
-    void onAdd(Entity entity) throws FalconException;
+    void onAdd(Entity entity, boolean ignoreFailure) throws FalconException;
 
     void onRemove(Entity entity) throws FalconException;
 

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/47111568/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java b/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java
index 86298cc..89c031d 100644
--- a/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/store/ConfigurationStoreTest.java
@@ -18,8 +18,11 @@
 
 package org.apache.falcon.entity.store;
 
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.service.ConfigurationChangeListener;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -40,6 +43,24 @@ public class ConfigurationStoreTest {
     private static final Logger LOG = Logger.getLogger(ConfigurationStoreTest.class);
 
     private ConfigurationStore store = ConfigurationStore.get();
+    private TestListener listener = new TestListener();
+
+    private class TestListener implements ConfigurationChangeListener {
+        @Override
+        public void onAdd(Entity entity, boolean ignoreFailure) throws FalconException {
+            throw new FalconException("For test");
+        }
+
+        @Override
+        public void onRemove(Entity entity) throws FalconException {
+            throw new FalconException("For test");
+        }
+
+        @Override
+        public void onChange(Entity oldEntity, Entity newEntity) throws FalconException {
+            throw new FalconException("For test");
+        }
+    }
 
     @Test
     public void testPublish() throws Exception {
@@ -48,6 +69,16 @@ public class ConfigurationStoreTest {
         store.publish(EntityType.PROCESS, process);
         Process p = store.get(EntityType.PROCESS, "hello");
         Assert.assertEquals(p, process);
+
+        store.registerListener(listener);
+        process.setName("world");
+        try {
+            store.publish(EntityType.PROCESS, process);
+            throw new AssertionError("Expected exception");
+        } catch(FalconException expected) {
+            //expected
+        }
+        store.unregisterListener(listener);
     }
 
     @Test
@@ -66,11 +97,16 @@ public class ConfigurationStoreTest {
         store.remove(EntityType.PROCESS, "remove");
         p = store.get(EntityType.PROCESS, "remove");
         Assert.assertNull(p);
-    }
 
-    @Test
-    public void testSearch() throws Exception {
-        //TODO
+        store.publish(EntityType.PROCESS, process);
+        store.registerListener(listener);
+        try {
+            store.remove(EntityType.PROCESS, "remove");
+            throw new AssertionError("Expected exception");
+        } catch(FalconException expected) {
+            //expected
+        }
+        store.unregisterListener(listener);
     }
 
     @BeforeSuite

http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/47111568/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
----------------------------------------------------------------------
diff --git a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
index 30856f6..c47ec01 100644
--- a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
+++ b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
@@ -139,7 +139,7 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener
     }
 
     @Override
-    public void onAdd(Entity entity) throws FalconException {
+    public void onAdd(Entity entity, boolean ignoreFailure) throws FalconException {
         if (entity.getEntityType() != EntityType.CLUSTER) {
             return;
         }
@@ -149,7 +149,14 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener
             return;
         }
 
-        addLibsTo(cluster);
+        try {
+            addLibsTo(cluster);
+        } catch(FalconException e) {
+            if (!ignoreFailure) {
+                throw e;
+            }
+            LOG.warn("Failed to copy shared libraries to cluster", e);
+        }
     }
 
     @Override