You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pr...@apache.org on 2017/01/02 08:52:39 UTC
falcon git commit: FALCON-2205 describe for non trusted recipe
Repository: falcon
Updated Branches:
refs/heads/master 42d379463 -> ccc343fa8
FALCON-2205 describe for non trusted recipe
Author: Praveen Adlakha <ad...@gmail.com>
Reviewers: @pallavi
Closes #312 from PraveenAdlakha/2205 and squashes the following commits:
7ab62c0 [Praveen Adlakha] checkstyle issue fixed
229506c [Praveen Adlakha] comments addressed and test cases added
daf3b48 [Praveen Adlakha] WIP
5ccf8ed [Praveen Adlakha] FALCON-2205 describe for non trusted recipe
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/ccc343fa
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/ccc343fa
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/ccc343fa
Branch: refs/heads/master
Commit: ccc343fa84bcb257f69eb64b942e58eae2efd504
Parents: 42d3794
Author: Praveen Adlakha <ad...@gmail.com>
Authored: Mon Jan 2 14:22:19 2017 +0530
Committer: Praveen Adlakha <ad...@gmail.com>
Committed: Mon Jan 2 14:22:19 2017 +0530
----------------------------------------------------------------------
.../entity/store/StoreAccessException.java | 4 ++
.../falcon/extensions/store/ExtensionStore.java | 68 ++++++++++++++------
.../extensions/store/ExtensionStoreTest.java | 14 +++-
3 files changed, 65 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/ccc343fa/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java b/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java
index 318dc2e..96f60ea 100644
--- a/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java
+++ b/common/src/main/java/org/apache/falcon/entity/store/StoreAccessException.java
@@ -35,4 +35,8 @@ public class StoreAccessException extends FalconException {
public StoreAccessException(Exception e) {
super(e);
}
+
+ public StoreAccessException(String message){
+ super(message);
+ }
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/ccc343fa/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
index 8e4acbe..13ff2d1 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
@@ -26,6 +26,7 @@ import org.apache.falcon.extensions.AbstractExtension;
import org.apache.falcon.extensions.ExtensionType;
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.persistence.ExtensionBean;
import org.apache.falcon.util.StartupProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -96,7 +97,7 @@ public final class ExtensionStore {
private void initializeDbTable() {
try {
metaStore.deleteExtensionsOfType(ExtensionType.TRUSTED);
- List<String> extensions = getExtensions();
+ List<String> extensions = getTrustedExtensions();
for (String extension : extensions) {
ExtensionType extensionType = AbstractExtension.isExtensionTrusted(extension)
? ExtensionType.TRUSTED : ExtensionType.CUSTOM;
@@ -141,19 +142,29 @@ public final class ExtensionStore {
}
}
- public Map<String, String> getExtensionArtifacts(final String extensionName) throws StoreAccessException {
+ public Map<String, String> getExtensionArtifacts(final String extensionName) throws
+ FalconException {
Map<String, String> extensionFileMap = new HashMap<>();
+ Path extensionPath;
try {
- Path extensionPath = new Path(storePath, extensionName.toLowerCase());
- RemoteIterator<LocatedFileStatus> fileStatusListIterator = fs.listFiles(extensionPath, true);
+ RemoteIterator<LocatedFileStatus> fileStatusListIterator;
+ if (AbstractExtension.isExtensionTrusted(extensionName)){
+ extensionPath = new Path(storePath, extensionName.toLowerCase());
+ fileStatusListIterator = fs.listFiles(extensionPath, true);
+ }else{
+ ExtensionBean extensionBean = metaStore.getDetail(extensionName);
+ extensionPath = new Path(extensionBean.getLocation());
+ FileSystem fileSystem = getHdfsFileSystem(extensionBean.getLocation());
+ fileStatusListIterator = fileSystem.listFiles(extensionPath, true);
+ }
if (!fileStatusListIterator.hasNext()) {
- throw new StoreAccessException(new Exception(" For extension " + extensionName
- + " there are no artifacts at the extension store path " + storePath));
+ throw new StoreAccessException(" For extension " + extensionName
+ + " there are no artifacts at the extension store path " + storePath);
}
while (fileStatusListIterator.hasNext()) {
LocatedFileStatus fileStatus = fileStatusListIterator.next();
- Path filePath = Path.getPathWithoutSchemeAndAuthority(fileStatus.getPath());
+ Path filePath = fileStatus.getPath();
extensionFileMap.put(filePath.getName(), filePath.toString());
}
} catch (IOException e) {
@@ -162,6 +173,8 @@ public final class ExtensionStore {
return extensionFileMap;
}
+
+
public Map<String, String> getExtensionResources(final String extensionName) throws StoreAccessException {
Map<String, String> extensionFileMap = new HashMap<>();
try {
@@ -178,8 +191,8 @@ public final class ExtensionStore {
}
if (resourcesPath == null) {
- throw new StoreAccessException(new Exception(" For extension " + extensionName
- + " there is no " + RESOURCES_DIR + "at the extension store path " + storePath));
+ throw new StoreAccessException(" For extension " + extensionName
+ + " there is no " + RESOURCES_DIR + "at the extension store path " + storePath);
}
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fs.listFiles(resourcesPath, true);
while (fileStatusListIterator.hasNext()) {
@@ -219,24 +232,31 @@ public final class ExtensionStore {
}
}
- public String getExtensionResource(final String resourcePath) throws StoreAccessException {
+ public String getExtensionResource(final String resourcePath) throws FalconException {
if (StringUtils.isBlank(resourcePath)) {
- throw new StoreAccessException(new Exception("Resource path cannot be null or empty"));
+ throw new StoreAccessException("Resource path cannot be null or empty");
}
try {
Path resourceFile = new Path(resourcePath);
+ InputStream data;
ByteArrayOutputStream writer = new ByteArrayOutputStream();
- InputStream data = fs.open(resourceFile);
- IOUtils.copyBytes(data, writer, fs.getConf(), true);
+ if (resourcePath.startsWith("file")){
+ data = fs.open(resourceFile);
+ IOUtils.copyBytes(data, writer, fs.getConf(), true);
+ }else{
+ FileSystem fileSystem = getHdfsFileSystem(resourcePath);
+ data = fileSystem.open(resourceFile);
+ IOUtils.copyBytes(data, writer, fileSystem.getConf(), true);
+ }
return writer.toString();
} catch (IOException e) {
throw new StoreAccessException(e);
}
}
- public List<String> getExtensions() throws StoreAccessException {
+ public List<String> getTrustedExtensions() throws StoreAccessException {
List<String> extensionList = new ArrayList<>();
try {
FileStatus[] fileStatuses = fs.listStatus(storePath);
@@ -276,6 +296,19 @@ public final class ExtensionStore {
throw new ValidationException(msg);
}
}
+ private FileSystem getHdfsFileSystem(String path) throws FalconException {
+ Configuration conf = new Configuration();
+ URI uri;
+ try {
+ uri = new URI(path);
+ } catch (URISyntaxException e){
+ LOG.error("Exception : ", e);
+ throw new FalconException(e);
+ }
+ conf.set("fs.default.name", uri.getScheme() + "://" + uri.getAuthority());
+ return HadoopClientFactory.get().createFalconFileSystem(uri);
+ }
+
public String registerExtension(final String extensionName, final String path, final String description,
String extensionOwner) throws URISyntaxException, FalconException {
@@ -285,7 +318,7 @@ public final class ExtensionStore {
assertURI("Authority", uri.getAuthority());
assertURI("Path", uri.getPath());
conf.set("fs.defaultFS", uri.getScheme() + "://" + uri.getAuthority());
- FileSystem fileSystem = HadoopClientFactory.get().createFalconFileSystem(uri);
+ FileSystem fileSystem = getHdfsFileSystem(path);
try {
fileSystem.listStatus(new Path(uri.getPath() + "/README"));
} catch (IOException e) {
@@ -328,11 +361,10 @@ public final class ExtensionStore {
LOG.info("Extension :" + extensionName + " registered successfully.");
return "Extension :" + extensionName + " registered successfully.";
}
-
- public String getResource(final String extensionName, final String resourceName) throws StoreAccessException {
+ public String getResource(final String extensionName, final String resourceName) throws FalconException {
Map<String, String> resources = getExtensionArtifacts(extensionName);
if (resources.isEmpty()) {
- throw new StoreAccessException(new Exception("No extension resources found for " + extensionName));
+ throw new StoreAccessException("No extension resources found for " + extensionName);
}
return getExtensionResource(resources.get(resourceName));
http://git-wip-us.apache.org/repos/asf/falcon/blob/ccc343fa/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
----------------------------------------------------------------------
diff --git a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
index 0835f38..1b33e1b 100644
--- a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
+++ b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
@@ -36,10 +36,10 @@ import org.testng.annotations.Test;
import javax.persistence.EntityManager;
import javax.persistence.Query;
+import java.io.OutputStreamWriter;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
-import java.io.OutputStreamWriter;
import java.net.URISyntaxException;
import java.util.Map;
@@ -139,6 +139,8 @@ public class ExtensionStoreTest extends AbstractTestExtensionStore {
createMETA(extensionPath);
store = ExtensionStore.get();
store.registerExtension("toBeDeleted", STORAGE_URL + extensionPath, "test desc", "falconUser");
+ Assert.assertTrue(store.getResource("toBeDeleted", "README").equals("README"));
+ store.getResource("toBeDeleted", "README");
store.deleteExtension("toBeDeleted", "falconUser");
ExtensionMetaStore metaStore = new ExtensionMetaStore();
Assert.assertEquals(metaStore.getAllExtensions().size(), 0);
@@ -188,14 +190,20 @@ public class ExtensionStoreTest extends AbstractTestExtensionStore {
if (fs.exists(path)) {
fs.delete(path, true);
}
- fs.create(path);
- path = new Path(extensionPath + "/libs/build/test.jar");
OutputStream os = fs.create(path);
BufferedWriter br = new BufferedWriter(new OutputStreamWriter(os, "UTF-8"));
+ br.write("README");
+ fs.create(path);
+ br.close();
+ os.close();
+ path = new Path(extensionPath + "/libs/build/test.jar");
+ os = fs.create(path);
+ br = new BufferedWriter(new OutputStreamWriter(os, "UTF-8"));
br.write("Hello World");
br.write("test jar");
fs.create(path);
br.close();
+ os.close();
}
private void clearDB() {