You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by su...@apache.org on 2015/03/30 21:24:48 UTC

falcon git commit: FALCON-1125 Feed Lookup API doesnt work via prism. Contributed by Ajay Yadava

Repository: falcon
Updated Branches:
  refs/heads/master 6f78180ff -> 0aeb6c89a


FALCON-1125 Feed Lookup API doesnt work via prism. Contributed by Ajay Yadava


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/0aeb6c89
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/0aeb6c89
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/0aeb6c89

Branch: refs/heads/master
Commit: 0aeb6c89a7fa16ebe8183c7bf66bfa8187b563a6
Parents: 6f78180
Author: Suhas Vasu <su...@inmobi.com>
Authored: Tue Mar 31 00:54:25 2015 +0530
Committer: Suhas Vasu <su...@inmobi.com>
Committed: Tue Mar 31 00:54:25 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  5 +-
 .../falcon/resource/FeedLookupResult.java       | 19 ++++++++
 .../falcon/entity/store/FeedLocationStore.java  | 25 +++++-----
 .../entity/store/FeedLocationStoreTest.java     | 21 +++++++--
 .../falcon/resource/AbstractEntityManager.java  |  2 +-
 .../proxy/SchedulableEntityManagerProxy.java    | 48 ++++++++++++++++----
 .../resource/SchedulableEntityManager.java      |  3 +-
 7 files changed, 96 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/0aeb6c89/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c136a86..4c7ba2b 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -123,11 +123,14 @@ Trunk (Unreleased)
    (Suhas vasu)
 
   BUG FIXES
+   FALCON-1125 Feed Lookup API doesnt work via prism
+   (Ajay Yadava via Suhas Vasu)
+
    FALCON-1119 Instance logs option is not returning the log location
    (Suhas Vasu)
 
    FALCON-1100 UI : Failed to load data. 404 not found
-   (Pallavi Rao via Suhas Vasu)
+   (Sowmya Ramesh via Suhas Vasu)
 
    FALCON-1123 Stacktrace printed by Falcon CLI is not useful to user
    (Pallavi Rao via Suhas Vasu)

http://git-wip-us.apache.org/repos/asf/falcon/blob/0aeb6c89/client/src/main/java/org/apache/falcon/resource/FeedLookupResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/FeedLookupResult.java b/client/src/main/java/org/apache/falcon/resource/FeedLookupResult.java
index 1198827..124feba 100644
--- a/client/src/main/java/org/apache/falcon/resource/FeedLookupResult.java
+++ b/client/src/main/java/org/apache/falcon/resource/FeedLookupResult.java
@@ -54,6 +54,25 @@ public class FeedLookupResult extends APIResult {
         this.elements = elements;
     }
 
+
+    @Override
+    public Object[] getCollection() {
+        return getElements();
+    }
+
+    @Override
+    public void setCollection(Object[] items) {
+        if (items == null) {
+            setElements(new FeedProperties[0]);
+        } else {
+            FeedProperties[] newInstances = new FeedProperties[items.length];
+            for (int index = 0; index < items.length; index++) {
+                newInstances[index] = (FeedProperties)items[index];
+            }
+            setElements(newInstances);
+        }
+    }
+
     @Override
     public String toString() {
         StringBuilder buffer = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/falcon/blob/0aeb6c89/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java b/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java
index c8ae0f5..fefd74b 100644
--- a/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java
+++ b/common/src/main/java/org/apache/falcon/entity/store/FeedLocationStore.java
@@ -28,6 +28,7 @@ import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.feed.Location;
 import org.apache.falcon.resource.FeedLookupResult;
 import org.apache.falcon.service.ConfigurationChangeListener;
+import org.apache.falcon.util.DeploymentUtil;
 import org.apache.falcon.util.FalconRadixUtils;
 import org.apache.falcon.util.RadixTree;
 import org.slf4j.Logger;
@@ -83,17 +84,19 @@ public final class FeedLocationStore implements ConfigurationChangeListener {
         if (entity.getEntityType() == EntityType.FEED){
             Feed feed = (Feed) entity;
             List<Cluster> clusters = feed.getClusters().getClusters();
-            for(Cluster cluster: clusters){
-                List<Location> clusterSpecificLocations = FeedHelper.getLocations(FeedHelper.getCluster(feed,
-                        cluster.getName()), feed);
-                if (clusterSpecificLocations != null) {
-                    for(Location location: clusterSpecificLocations){
-                        if (location != null && StringUtils.isNotBlank(location.getPath())){
-                            FeedLookupResult.FeedProperties value = new FeedLookupResult.FeedProperties(feed.getName(),
-                                    location.getType(), cluster.getName());
-                            store.insert(StringUtils.trim(location.getPath()), value);
-                            LOG.debug("Inserted location: {} for feed: {} and cluster: {}",
-                                    location.getPath(), feed.getName(), cluster.getName());
+            for(Cluster cluster: clusters) {
+                if (DeploymentUtil.getCurrentClusters().contains(cluster.getName())) {
+                    List<Location> clusterSpecificLocations = FeedHelper.getLocations(FeedHelper.getCluster(feed,
+                            cluster.getName()), feed);
+                    if (clusterSpecificLocations != null) {
+                        for (Location location : clusterSpecificLocations) {
+                            if (location != null && StringUtils.isNotBlank(location.getPath())) {
+                                FeedLookupResult.FeedProperties value = new FeedLookupResult.FeedProperties(
+                                        feed.getName(), location.getType(), cluster.getName());
+                                store.insert(StringUtils.trim(location.getPath()), value);
+                                LOG.debug("Inserted location: {} for feed: {} and cluster: {}",
+                                        location.getPath(), feed.getName(), cluster.getName());
+                            }
                         }
                     }
                 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/0aeb6c89/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java b/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java
index 611c205..4c4b7f3 100644
--- a/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java
+++ b/common/src/test/java/org/apache/falcon/entity/store/FeedLocationStoreTest.java
@@ -20,6 +20,7 @@ package org.apache.falcon.entity.store;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.AbstractTestBase;
 import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.feed.CatalogTable;
 import org.apache.falcon.entity.v0.feed.Cluster;
@@ -45,7 +46,7 @@ import java.util.Collection;
 /**
  * Tests for FeedLocationStore.
  */
-public class FeedLocationStoreTest {
+public class FeedLocationStoreTest extends AbstractTestBase {
     private ConfigurationStore store;
 
 
@@ -57,8 +58,11 @@ public class FeedLocationStoreTest {
         FileUtils.deleteDirectory(new File(location));
 
         cleanupStore();
+        String listeners = StartupProperties.get().getProperty("configstore.listeners");
         StartupProperties.get().setProperty("configstore.listeners",
-                "org.apache.falcon.entity.store.FeedLocationStore");
+                listeners.replace("org.apache.falcon.service.SharedLibraryHostingService", ""));
+//        StartupProperties.get().setProperty("configstore.listeners",
+//                "org.apache.falcon.entity.store.FeedLocationStore");
         store = ConfigurationStore.get();
         store.init();
 
@@ -68,6 +72,7 @@ public class FeedLocationStoreTest {
     @BeforeMethod
     public void setUp() throws FalconException{
         cleanupStore();
+        createClusters();
     }
 
     @AfterMethod
@@ -200,7 +205,7 @@ public class FeedLocationStoreTest {
         return location;
     }
 
-    private void cleanupStore() throws FalconException {
+    protected void cleanupStore() throws FalconException {
         store = ConfigurationStore.get();
         for (EntityType type : EntityType.values()) {
             Collection<String> entities = store.getEntities(type);
@@ -245,4 +250,14 @@ public class FeedLocationStoreTest {
 
         return clusters;
     }
+
+    private void createClusters() throws FalconException {
+        String[] clusterNames = {"cluster1WithLocations", "cluster2WithLocations", "blankCluster1", "blankCluster2"};
+        for (String name : clusterNames) {
+            org.apache.falcon.entity.v0.cluster.Cluster cluster = new org.apache.falcon.entity.v0.cluster.Cluster();
+            cluster.setName(name);
+            cluster.setColo("default");
+            store.publish(EntityType.CLUSTER, cluster);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/0aeb6c89/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index 8c32469..c0df4d6 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -952,7 +952,7 @@ public abstract class AbstractEntityManager {
      * @param instancePath location of the data
      * @return Feed Name, type of the data and cluster name.
      */
-    public FeedLookupResult reverseLookup(String type, String instancePath) {
+    public APIResult reverseLookup(String type, String instancePath) {
         try {
             EntityType entityType = EntityType.getEnum(type);
             if (entityType != EntityType.FEED) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/0aeb6c89/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index 1c365ab..8bfc099 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -42,6 +42,7 @@ import javax.ws.rs.*;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import java.lang.reflect.Constructor;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -492,33 +493,62 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
     public FeedLookupResult reverseLookup(
             @Dimension("type") @PathParam("type") final String type,
             @Dimension("path") @QueryParam("path") final String path) {
-        return super.reverseLookup(type, path);
+        String entity = "DummyEntity"; // A dummy entity name to get around
+        return new EntityProxy<FeedLookupResult>(type, entity, FeedLookupResult.class) {
+            @Override
+            protected Set<String> getColosToApply() {
+                return getAllColos();
+            }
+
+            @Override
+            protected FeedLookupResult doExecute(String colo) throws FalconException {
+                return getEntityManager(colo).invoke("reverseLookup", type, path);
+            }
+        }.execute();
+
     }
     //RESUME CHECKSTYLE CHECK ParameterNumberCheck
 
-    private abstract class EntityProxy {
+    private abstract class EntityProxy<T extends APIResult> {
+        private final Class<T> clazz;
         private String type;
         private String name;
 
-        public EntityProxy(String type, String name) {
+        public EntityProxy(String type, String name, Class<T> resultClazz) {
+            this.clazz = resultClazz;
             this.type = type;
             this.name = name;
         }
 
-        public APIResult execute() {
+
+        private T getResultInstance(APIResult.Status status, String message) {
+            try {
+                Constructor<T> constructor = clazz.getConstructor(APIResult.Status.class, String.class);
+                return constructor.newInstance(status, message);
+            } catch (Exception e) {
+                throw new FalconRuntimException("Unable to consolidate result.", e);
+            }
+        }
+
+        public EntityProxy(String type, String name) {
+            this(type, name, (Class<T>) APIResult.class);
+        }
+
+        public T execute() {
             Set<String> colos = getColosToApply();
 
-            Map<String, APIResult> results = new HashMap<String, APIResult>();
+            Map<String, T> results = new HashMap();
 
             for (String colo : colos) {
                 try {
                     results.put(colo, doExecute(colo));
                 } catch (FalconException e) {
-                    results.put(colo,
-                            new APIResult(APIResult.Status.FAILED, e.getClass().getName() + "::" + e.getMessage()));
+                    results.put(colo, getResultInstance(APIResult.Status.FAILED, e.getClass().getName() + "::"
+                            + e.getMessage()));
                 }
             }
-            APIResult finalResult = consolidateResult(results, APIResult.class);
+
+            T finalResult = consolidateResult(results, clazz);
             if (finalResult.getStatus() != APIResult.Status.SUCCEEDED) {
                 throw FalconWebException.newException(finalResult, Response.Status.BAD_REQUEST);
             } else {
@@ -530,6 +560,6 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
             return getApplicableColos(type, name);
         }
 
-        protected abstract APIResult doExecute(String colo) throws FalconException;
+        protected abstract T doExecute(String colo) throws FalconException;
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/0aeb6c89/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
index 52adb5f..261a9a9 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
@@ -166,8 +166,7 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager {
     @Path("lookup/{type}/")
     @Produces(MediaType.APPLICATION_JSON)
     @Monitored(event = "reverse-lookup")
-    public FeedLookupResult reverseLookup(
-            @Context HttpServletRequest request,
+    public APIResult reverseLookup(
             @Dimension("type") @PathParam("type") String type,
             @Dimension("path") @QueryParam("path") String instancePath) {
         return super.reverseLookup(type, instancePath);