You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2017/10/23 05:52:47 UTC

[13/26] falcon git commit: FALCON-2288 overwriting jobName while job submission for trusted extensions

FALCON-2288 overwriting jobName while job submission for trusted extensions

Author: Pracheer Agarwal <pr...@inmobi.com>
Author: Pracheer Agarwal <pr...@gmail.com>
Author: Pracheer Agarwal <pr...@im2216-x0.corp.inmobi.com>
Author: Pracheer <pr...@cheer.corp.inmobi.com>

Reviewers: @pallavi-rao,@sandeepSamudrala

Closes #373 from PracheerAgarwal/FALCON-2288 and squashes the following commits:

caa25d7 [Pracheer Agarwal] FALCON-2288 code restructuring
d775b0d [Pracheer Agarwal] fixing checkstyle failures
fcaf14a [Pracheer Agarwal] FALCON-2288, overwriting job_name for trusted extension
83d9873 [Pracheer Agarwal] FALCON-2288, overwriting job_name for trusted extension
41da933 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon
fc6ef2b [Pracheer] Merge branch 'master' of https://github.com/apache/falcon
7814fba [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon
ba60452 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon
ed65aa0 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon
9ff05df [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon
9c2f0a5 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon
9cd8c17 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon
778c579 [Pracheer Agarwal] Merge branch 'master' of https://github.com/PracheerAgarwal/falcon
e39808d [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon
a932633 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon
fda3b28 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon
a93d71a [Pracheer Agarwal] Merge branch 'master' of https://github.com/PracheerAgarwal/falcon
e3728d5 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon
066c8e2 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon
b20f044 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon
7f572a1 [Pracheer Agarwal] Merge branch 'master' of https://github.com/apache/falcon
46042fd [Pracheer Agarwal] Merge branch 'master' of https://github.com/PracheerAgarwal/falcon
daa3ffc [Pracheer Agarwal] FALCON-2225 extension owner added for trusted extensions
622cae4 [Pracheer Agarwal] FALCON-2225 extension owner added for trusted extensions

(cherry picked from commit ded34d4bdd6367e7b271fe07800d7a856f93abfc)
Signed-off-by: Pallavi Rao <pa...@inmobi.com>


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/3eea94b8
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/3eea94b8
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/3eea94b8

Branch: refs/heads/master
Commit: 3eea94b833187524153b294ad025aba5cb20bf55
Parents: 23229a9
Author: Pracheer Agarwal <pr...@inmobi.com>
Authored: Thu Mar 9 10:36:53 2017 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Thu Mar 9 10:37:17 2017 +0530

----------------------------------------------------------------------
 .../org/apache/falcon/extensions/Extension.java |  4 --
 .../resource/proxy/ExtensionManagerProxy.java   | 41 +++++++++++++-------
 2 files changed, 27 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/3eea94b8/extensions/src/main/java/org/apache/falcon/extensions/Extension.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/Extension.java b/extensions/src/main/java/org/apache/falcon/extensions/Extension.java
index 6b94323..3869718 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/Extension.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/Extension.java
@@ -21,7 +21,6 @@ package org.apache.falcon.extensions;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.Pair;
-import org.apache.falcon.entity.EntityUtil;
 import org.apache.falcon.entity.v0.Entity;
 import org.apache.falcon.entity.v0.feed.Schema;
 import org.apache.falcon.extensions.store.ExtensionStore;
@@ -110,9 +109,6 @@ public class Extension implements ExtensionBuilder {
             throw new FalconException("Entity created from the extension template cannot be null");
         }
         LOG.info("Extension processing complete");
-        // add tags on extension name and job
-        String jobName = configProperties.getProperty(ExtensionProperties.JOB_NAME.getName());
-        EntityUtil.applyTags(extensionName, jobName, Collections.singletonList(entity));
         return Collections.singletonList(entity);
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/3eea94b8/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
index 033b6cc..9808892 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.Properties;
 import javax.servlet.ServletInputStream;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.Consumes;
@@ -57,6 +58,7 @@ import org.apache.falcon.entity.v0.EntityType;
 import org.apache.falcon.entity.v0.feed.Feed;
 import org.apache.falcon.entity.v0.process.Process;
 import org.apache.falcon.extensions.Extension;
+import org.apache.falcon.extensions.ExtensionProperties;
 import org.apache.falcon.extensions.ExtensionService;
 import org.apache.falcon.extensions.ExtensionType;
 import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
@@ -300,26 +302,37 @@ public class ExtensionManagerProxy extends AbstractExtensionManager {
         List<Entity> entities;
         TreeMap<EntityType, List<Entity>> entityMap = new TreeMap<>();
         if (ExtensionType.TRUSTED.equals(extensionType)) {
-            entities = extension.getEntities(extensionName, config);
-            List<Entity> trustedFeeds = new ArrayList<>();
-            List<Entity> trustedProcesses = new ArrayList<>();
+            entities = extension.getEntities(jobName, addJobNameToConf(config, jobName));
+            feeds = new ArrayList<>();
+            processes = new ArrayList<>();
             for (Entity entity : entities) {
                 if (EntityType.FEED.equals(entity.getEntityType())) {
-                    trustedFeeds.add(entity);
+                    feeds.add(entity);
                 } else {
-                    trustedProcesses.add(entity);
+                    processes.add(entity);
                 }
             }
-            entityMap.put(EntityType.PROCESS, trustedProcesses);
-            entityMap.put(EntityType.FEED, trustedFeeds);
-            return entityMap;
-        } else {
-            EntityUtil.applyTags(extensionName, jobName, processes);
-            EntityUtil.applyTags(extensionName, jobName, feeds);
-            entityMap.put(EntityType.PROCESS, processes);
-            entityMap.put(EntityType.FEED, feeds);
-            return entityMap;
         }
+        // add tags on extension name and job
+        EntityUtil.applyTags(extensionName, jobName, processes);
+        EntityUtil.applyTags(extensionName, jobName, feeds);
+        entityMap.put(EntityType.PROCESS, processes);
+        entityMap.put(EntityType.FEED, feeds);
+        return entityMap;
+    }
+
+    private InputStream addJobNameToConf(InputStream conf, String jobName) throws  FalconException{
+        Properties inputProperties = new Properties();
+        ByteArrayOutputStream output = new ByteArrayOutputStream();
+        try {
+            inputProperties.load(conf);
+            inputProperties.setProperty(ExtensionProperties.JOB_NAME.getName(), jobName);
+            inputProperties.store(output, null);
+        } catch (IOException e) {
+            LOG.error("Error in reading the config stream");
+            throw new FalconException("Error while reading the config stream", e);
+        }
+        return new ByteArrayInputStream(output.toByteArray());
     }
 
     private ExtensionType getExtensionType(String extensionName) {