You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2017/02/02 12:09:40 UTC

falcon git commit: FALCON-2267 Definition api fails if resources are empty

Repository: falcon
Updated Branches:
  refs/heads/master 993861c85 -> 0825d80a6


FALCON-2267 Definition api fails if resources are empty

This is extension of sandeepSamudrala's work on PR 353. Completing it on his behalf as he is out on vacation.

Dev testing done:
`IM1738M1:falcon-0.11-SNAPSHOT pallavi.rao$ bin/falcon extension -enumerate
[
  {
    "name": "sample",
    "type": "Custom extension",
    "location": "hdfs://192.168.138.236:8020/tmp/extensions/extension-example"
  },
  \u2026..
  {
    "name": "hive-mirroring",
    "type": "Trusted extension",
    "description": "This extension implements replicating hive metadata and data from one Hadoop cluster to another Hadoop cluster.",
    "location": "file:/Users/pallavi.rao/falcon/falcon-0.11-SNAPSHOT/extensions/hive-mirroring"
  }
]
IM1738M1:falcon-0.11-SNAPSHOT pallavi.rao$ bin/falcon extension -definition -extensionName hive-mirroring

{
    "shortDescription":"This extension implements replicating hive metadata and data from one Hadoop cluster to another Hadoop cluster.",
    "properties":[
        {
            "propertyName":"jobName",
            "required":true,
            "description":"Unique job name",
            "example":"hive-monthly-sales-dr"
        },
\u2026.
    ]
}
IM1738M1:falcon-0.11-SNAPSHOT pallavi.rao$ bin/falcon extension -definition -extensionName sample
Contents of file config:

Contents of file config2:
<workflow-app xmlns="uri:oozie:workflow:0.1" name="merlin-workflow">
 \u2026.

IM1738M1:falcon-0.11-SNAPSHOT pallavi.rao$ bin/falcon extension -describe -extensionName sample
Extension Test

IM1738M1:falcon-0.11-SNAPSHOT pallavi.rao$ bin/falcon extension -describe -extensionName hive-mirroring
.....
Hive Mirroring Extension

Overview
Falcon provides feature to replicate Hive metadata and data events from source cluster to destination cluster.
\u2026..`

Author: Pallavi Rao <pa...@inmobi.com>

Reviewers: @PracheerAgarwal, @sandeepSamudrala

Closes #356 from pallavi-rao/2267


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/0825d80a
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/0825d80a
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/0825d80a

Branch: refs/heads/master
Commit: 0825d80a638652bc68119425e910f969602c83c6
Parents: 993861c
Author: Pallavi Rao <pa...@inmobi.com>
Authored: Thu Feb 2 17:39:23 2017 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Thu Feb 2 17:39:23 2017 +0530

----------------------------------------------------------------------
 .../apache/falcon/cli/FalconExtensionCLI.java   |  1 -
 .../falcon/extensions/store/ExtensionStore.java | 87 ++++++++-----------
 .../extensions/store/ExtensionStoreTest.java    |  5 +-
 .../resource/AbstractExtensionManager.java      | 11 +--
 .../resource/proxy/ExtensionManagerProxy.java   | 89 ++++++++++----------
 5 files changed, 86 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/0825d80a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
----------------------------------------------------------------------
diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
index 5d44128..293df23 100644
--- a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
+++ b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
@@ -92,7 +92,6 @@ public class FalconExtensionCLI extends FalconCLI{
         } else if (optionsList.contains(DEFINITION_OPT)) {
             validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
             result = client.getExtensionDefinition(extensionName).getMessage();
-            result = prettyPrintJson(result);
         } else if (optionsList.contains(DESCRIBE_OPT)) {
             validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
             result = client.getExtensionDescription(extensionName).getMessage();

http://git-wip-us.apache.org/repos/asf/falcon/blob/0825d80a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
index 1d71651..ed42b6b 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
@@ -18,6 +18,15 @@
 
 package org.apache.falcon.extensions.store;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.commons.lang.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.entity.parser.ValidationException;
@@ -41,16 +50,6 @@ import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 /**
  * Store for Falcon extensions.
  */
@@ -114,8 +113,9 @@ public final class ExtensionStore {
     }
 
     private String getShortDescription(final String extensionName) throws FalconException {
-        String content = getResource(extensionName, extensionName.toLowerCase()
-                + EXTENSION_PROPERTY_JSON_SUFFIX);
+        String location = storePath.toString() + "/" + extensionName + "/META/"
+                + extensionName.toLowerCase() + EXTENSION_PROPERTY_JSON_SUFFIX;
+        String content = getResource(location);
         String description;
         try {
             JSONObject jsonObject = new JSONObject(content);
@@ -141,40 +141,6 @@ public final class ExtensionStore {
         }
     }
 
-    private Map<String, String> getExtensionArtifacts(final String extensionName) throws
-            FalconException {
-        Map<String, String> extensionFileMap = new HashMap<>();
-        Path extensionPath;
-        try {
-            RemoteIterator<LocatedFileStatus> fileStatusListIterator;
-            if (AbstractExtension.isExtensionTrusted(extensionName)) {
-                extensionPath = new Path(storePath, extensionName.toLowerCase());
-                fileStatusListIterator = fs.listFiles(extensionPath, true);
-            } else {
-                ExtensionBean extensionBean = metaStore.getDetail(extensionName);
-                if (null == extensionBean) {
-                    throw new StoreAccessException("Extension not found:" + extensionName);
-                }
-                extensionPath = new Path(extensionBean.getLocation());
-                FileSystem fileSystem = getHdfsFileSystem(extensionBean.getLocation());
-                fileStatusListIterator = fileSystem.listFiles(extensionPath, true);
-            }
-
-            if (!fileStatusListIterator.hasNext()) {
-                throw new StoreAccessException(" For extension " + extensionName
-                        + " there are no artifacts at the extension store path " + storePath);
-            }
-            while (fileStatusListIterator.hasNext()) {
-                LocatedFileStatus fileStatus = fileStatusListIterator.next();
-                Path filePath = fileStatus.getPath();
-                extensionFileMap.put(filePath.getName(), filePath.toString());
-            }
-        } catch (IOException e) {
-            throw new StoreAccessException(e);
-        }
-        return extensionFileMap;
-    }
-
 
     public Map<String, String> getExtensionResources(final String extensionName) throws StoreAccessException {
         Map<String, String> extensionFileMap = new HashMap<>();
@@ -340,7 +306,7 @@ public final class ExtensionStore {
         }
         FileStatus[] propStatus;
         try {
-            propStatus = fileSystem.listStatus(new Path(uri.getPath() + "/META"));
+            propStatus = fileSystem.listStatus(new Path(uri.getPath() , "META"));
             if (propStatus.length <= 0) {
                 throw new ValidationException("No properties file is not present in the " + uri.getPath() + "/META"
                         + " structure.");
@@ -360,13 +326,30 @@ public final class ExtensionStore {
         return "Extension :" + extensionName + " registered successfully.";
     }
 
-    public String getResource(final String extensionName, final String resourceName) throws FalconException {
-        Map<String, String> resources = getExtensionArtifacts(extensionName);
-        if (resources.isEmpty()) {
-            throw new StoreAccessException("No extension resources found for " + extensionName);
+    public String getResource(final String extensionResourcePath)
+        throws FalconException {
+        StringBuilder definition = new StringBuilder();
+        Path resourcePath = new Path(extensionResourcePath);
+        FileSystem fileSystem = HadoopClientFactory.get().createFalconFileSystem(resourcePath.toUri());
+        try {
+            if (fileSystem.isFile(resourcePath)) {
+                definition.append(getExtensionResource(extensionResourcePath.toString()));
+            } else {
+                RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem.listFiles(resourcePath, false);
+                while (fileStatusListIterator.hasNext()) {
+                    LocatedFileStatus fileStatus = fileStatusListIterator.next();
+                    Path filePath = fileStatus.getPath();
+                    definition.append("Contents of file ").append(filePath.getName()).append(":\n");
+                    definition.append(getExtensionResource(filePath.toString())).append("\n \n");
+                }
+            }
+        } catch (IOException e) {
+            LOG.error("Exception while getting file(s) with path : " + extensionResourcePath, e);
+            throw new StoreAccessException(e);
         }
 
-        return getExtensionResource(resources.get(resourceName));
+        return definition.toString();
+
     }
 
     public Path getExtensionStorePath() {

http://git-wip-us.apache.org/repos/asf/falcon/blob/0825d80a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
----------------------------------------------------------------------
diff --git a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
index b2fac5f..1c82db1 100644
--- a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
+++ b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
@@ -50,7 +50,7 @@ import java.util.Map;
 public class ExtensionStoreTest extends AbstractTestExtensionStore {
     private static Map<String, String> resourcesMap;
     private static JailedFileSystem fs;
-    protected static final String EXTENSION_PATH = "/projects/falcon/extension";
+    protected static final String EXTENSION_PATH = "/projects/falcon/extension/";
     private static final String STORAGE_URL = "jail://global:00";
 
     @BeforeClass
@@ -140,8 +140,7 @@ public class ExtensionStoreTest extends AbstractTestExtensionStore {
         createMETA(extensionPath);
         store = ExtensionStore.get();
         store.registerExtension("toBeDeleted", STORAGE_URL + extensionPath, "test desc", "falconUser");
-        Assert.assertTrue(store.getResource("toBeDeleted", "README").equals("README"));
-        store.getResource("toBeDeleted", "README");
+        Assert.assertTrue(store.getResource(STORAGE_URL + extensionPath + "/README").equals("README"));
         store.deleteExtension("toBeDeleted", "falconUser");
         ExtensionMetaStore metaStore = new ExtensionMetaStore();
         Assert.assertEquals(metaStore.getAllExtensions().size(), 0);

http://git-wip-us.apache.org/repos/asf/falcon/blob/0825d80a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
index 2d4f915..a3b6ef1 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
@@ -268,13 +268,7 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager {
     }
 
     protected static void checkIfExtensionIsEnabled(String extensionName) {
-        ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
-        ExtensionBean extensionBean = metaStore.getDetail(extensionName);
-        if (extensionBean == null) {
-            LOG.error("Extension not found: " + extensionName);
-            throw FalconWebException.newAPIException("Extension not found:" + extensionName,
-                    Response.Status.NOT_FOUND);
-        }
+        ExtensionBean extensionBean = getExtensionIfExists(extensionName);
         if (!extensionBean.getStatus().equals(ExtensionStatus.ENABLED)) {
             LOG.error("Extension: " + extensionName + " is in disabled state.");
             throw FalconWebException.newAPIException("Extension: " + extensionName + " is in disabled state.",
@@ -282,7 +276,7 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager {
         }
     }
 
-    protected static void checkIfExtensionExists(String extensionName) {
+    protected static ExtensionBean getExtensionIfExists(String extensionName) {
         ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
         ExtensionBean extensionBean = metaStore.getDetail(extensionName);
         if (extensionBean == null) {
@@ -290,6 +284,7 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager {
             throw FalconWebException.newAPIException("Extension not found:" + extensionName,
                     Response.Status.NOT_FOUND);
         }
+        return extensionBean;
     }
 
     protected static void checkIfExtensionJobNameExists(String jobName, String extensionName) {

http://git-wip-us.apache.org/repos/asf/falcon/blob/0825d80a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
index c387f08..61aa39a 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
@@ -20,6 +20,34 @@ package org.apache.falcon.resource.proxy;
 
 import com.sun.jersey.multipart.FormDataBodyPart;
 import com.sun.jersey.multipart.FormDataParam;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import javax.servlet.ServletInputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.xml.bind.JAXBException;
 import org.apache.commons.io.IOUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.FalconWebException;
@@ -30,52 +58,24 @@ import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.extensions.Extension;
+import org.apache.falcon.extensions.ExtensionProperties;
 import org.apache.falcon.extensions.ExtensionService;
 import org.apache.falcon.extensions.ExtensionType;
-import org.apache.falcon.extensions.ExtensionProperties;
 import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
 import org.apache.falcon.extensions.store.ExtensionStore;
 import org.apache.falcon.persistence.ExtensionBean;
 import org.apache.falcon.persistence.ExtensionJobsBean;
-import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.resource.APIResult;
 import org.apache.falcon.resource.AbstractExtensionManager;
 import org.apache.falcon.resource.ExtensionInstanceList;
 import org.apache.falcon.resource.ExtensionJobList;
+import org.apache.falcon.resource.InstancesResult;
 import org.apache.falcon.security.CurrentUser;
 import org.apache.falcon.service.Services;
 import org.apache.falcon.util.DeploymentUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.servlet.ServletInputStream;
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.xml.bind.JAXBException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Arrays;
-import java.util.Set;
-import java.util.Properties;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.SortedMap;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.ByteArrayInputStream;
-
 /**
  * Jersey Resource for extension job operations.
  */
@@ -101,7 +101,7 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
             @DefaultValue(ASCENDING_SORT_ORDER) @QueryParam("sortOrder") String sortOrder,
             @DefaultValue("") @QueryParam("doAs") String doAsUser) {
         checkIfExtensionServiceIsEnabled();
-        checkIfExtensionExists(extensionName);
+        getExtensionIfExists(extensionName);
         try {
             return super.getExtensionJobs(extensionName, sortOrder, doAsUser);
         } catch (Throwable e) {
@@ -341,14 +341,7 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
     }
 
     private ExtensionType getExtensionType(String extensionName) {
-        ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
-        ExtensionBean extensionDetails = metaStore.getDetail(extensionName);
-        if (extensionDetails == null) {
-            // return failure if the extension job doesn't exist
-            LOG.error("Extension not found: " + extensionName);
-            throw FalconWebException.newAPIException("Extension not found:" + extensionName,
-                    Response.Status.NOT_FOUND);
-        }
+        ExtensionBean extensionDetails = getExtensionIfExists(extensionName);
         return extensionDetails.getExtensionType();
     }
 
@@ -623,9 +616,10 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
     public APIResult getExtensionDescription(
             @PathParam("extension-name") String extensionName) {
         checkIfExtensionServiceIsEnabled();
-        validateExtensionName(extensionName);
+        ExtensionBean extensionBean = getExtensionIfExists(extensionName);
         try {
-            return new APIResult(APIResult.Status.SUCCEEDED, ExtensionStore.get().getResource(extensionName, README));
+            String extensionResourcePath = extensionBean.getLocation() + File.separator +  README;
+            return new APIResult(APIResult.Status.SUCCEEDED, ExtensionStore.get().getResource(extensionResourcePath));
         } catch (FalconException e) {
             throw FalconWebException.newAPIException(e, Response.Status.BAD_REQUEST);
         } catch (Throwable e) {
@@ -694,9 +688,18 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
     public APIResult getExtensionDefinition(
             @PathParam("extension-name") String extensionName) {
         checkIfExtensionServiceIsEnabled();
+        ExtensionBean extensionBean = getExtensionIfExists(extensionName);
         try {
-            return new APIResult(APIResult.Status.SUCCEEDED, ExtensionStore.get().getResource(extensionName,
-                    extensionName.toLowerCase() + EXTENSION_PROPERTY_JSON_SUFFIX));
+            ExtensionType extensionType = extensionBean.getExtensionType();
+            String extensionResourcePath;
+            if (ExtensionType.TRUSTED.equals(extensionType)) {
+                extensionResourcePath = extensionBean.getLocation() + "/META/"
+                        + extensionName.toLowerCase() + EXTENSION_PROPERTY_JSON_SUFFIX;
+            } else {
+                extensionResourcePath = extensionBean.getLocation() + "/META";
+            }
+            return new APIResult(APIResult.Status.SUCCEEDED,
+                    ExtensionStore.get().getResource(extensionResourcePath));
         } catch (FalconException e) {
             throw FalconWebException.newAPIException(e, Response.Status.BAD_REQUEST);
         } catch (Throwable e) {