You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2017/01/19 05:23:15 UTC

falcon git commit: FALCON-2259 Unregister an extension only if no extension jobs are dependant on the extension

Repository: falcon
Updated Branches:
  refs/heads/master 6e162747b -> 3c011688b


FALCON-2259 Unregister an extension only if no extension jobs are dependant on the extension

Author: sandeep <sa...@gmail.com>

Reviewers: @pallavi-rao

Closes #342 from sandeepSamudrala/FALCON-2259 and squashes the following commits:

d317d3d [sandeep] Fixed tests
6617993 [sandeep] Incorporated review comments
46ad617 [sandeep] Incorporated review comments
c1189be [sandeep] Incorporated review comments
a70f5a9 [sandeep] FALCON-2259 Unregister an extension only if no extension jobs are dependant on the extension
7e16263 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a234d94 [sandeep] FALCON-2231 Incoporated review comments and small fixes for duplicate submission and colo addition to schedule command
26e3350 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
73fbf75 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
cc28658 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
089b10d [sandeep] Merge branch 'master' of https://github.com/apache/falcon
456d4ee [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0cf9af6 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
4a2e23e [sandeep] Merge branch 'master' of https://github.com/apache/falcon
b1546ed [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0a433fb [sandeep] Merge branch 'master' of https://github.com/apache/falcon
194f36a [sandeep] Merge branch 'master' of https://github.com/apache/falcon
e0ad358 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
f96a084 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
9cf36e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
bbca081 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
48f6afa [sandeep] Merge branch 'master' of https://github.com/apache/falcon
250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon
1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon
c065566 [sandeep] reverting last line changes made
1a4dcd2 [sandeep] rebased and resolved the conflicts from master
271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay.
a94d4fe [sandeep] rebasing from master
9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/3c011688
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/3c011688
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/3c011688

Branch: refs/heads/master
Commit: 3c011688bf910ee47786afa6f6c582d9c8d356ef
Parents: 6e16274
Author: sandeep <sa...@gmail.com>
Authored: Thu Jan 19 10:53:06 2017 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Thu Jan 19 10:53:06 2017 +0530

----------------------------------------------------------------------
 .../falcon/persistence/ExtensionJobsBean.java   |  3 +-
 .../persistence/PersistenceConstants.java       |  7 ++--
 .../extensions/jdbc/ExtensionMetaStore.java     | 12 ++++++
 .../falcon/extensions/store/ExtensionStore.java | 12 ++++++
 .../resource/AbstractExtensionManager.java      | 24 +++++++++---
 .../apache/falcon/unit/FalconUnitClient.java    |  6 ++-
 .../falcon/unit/LocalExtensionManager.java      |  2 +-
 .../apache/falcon/unit/FalconUnitTestBase.java  | 18 ++++-----
 .../org/apache/falcon/unit/TestFalconUnit.java  | 39 ++++++++++----------
 9 files changed, 84 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/3c011688/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java b/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java
index 15a4dac..b6ac79d 100644
--- a/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java
+++ b/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java
@@ -44,7 +44,8 @@ import java.util.List;
 @NamedQueries({
         @NamedQuery(name = PersistenceConstants.GET_ALL_EXTENSION_JOBS, query = "select OBJECT(a) from ExtensionJobsBean a "),
         @NamedQuery(name = PersistenceConstants.DELETE_EXTENSION_JOB, query = "delete from ExtensionJobsBean a where a.jobName = :jobName "),
-        @NamedQuery(name = PersistenceConstants.GET_EXTENSION_JOB, query = "select OBJECT(a) from ExtensionJobsBean a where a.jobName = :jobName")
+        @NamedQuery(name = PersistenceConstants.GET_EXTENSION_JOB, query = "select OBJECT(a) from ExtensionJobsBean a where a.jobName = :jobName"),
+        @NamedQuery(name = PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION, query = "select OBJECT(a) from ExtensionJobsBean a where a.extensionName = :extensionName")
 })
 //RESUME CHECKSTYLE CHECK  LineLengthCheck
 public class ExtensionJobsBean {

http://git-wip-us.apache.org/repos/asf/falcon/blob/3c011688/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
index e80f7b7..1e6a04b 100644
--- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
+++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
@@ -60,9 +60,9 @@ public final class PersistenceConstants {
     public static final String GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE";
 
     public static final String GET_LATEST_INSTANCE_TIME = "GET_LATEST_INSTANCE_TIME";
-    public static final String GET_ENTITY_ALERTS = "GET_ENTITY_ALERTS";
-    public static final String GET_ALL_ENTITY_ALERTS = "GET_ALL_ENTITY_ALERTS";
-    public static final String GET_SLA_HIGH_CANDIDATES = "GET_SLA_HIGH_CANDIDATES";
+    static final String GET_ENTITY_ALERTS = "GET_ENTITY_ALERTS";
+    static final String GET_ALL_ENTITY_ALERTS = "GET_ALL_ENTITY_ALERTS";
+    static final String GET_SLA_HIGH_CANDIDATES = "GET_SLA_HIGH_CANDIDATES";
     public static final String UPDATE_SLA_HIGH = "UPDATE_SLA_HIGH";
 
     public static final String GET_ENTITY_ALERT_INSTANCE = "GET_ENTITY_ALERT_INSTANCE";
@@ -80,5 +80,6 @@ public final class PersistenceConstants {
     public static final String GET_ALL_EXTENSION_JOBS = "GET_ALL_EXTENSION_JOBS";
     public static final String DELETE_EXTENSION_JOB = "DELETE_EXTENSION_JOB";
     public static final String GET_EXTENSION_JOB = "GET_EXTENSION_JOB";
+    public static final String GET_JOBS_FOR_AN_EXTENSION = "GET_JOBS_FOR_AN_EXTENSION";
     public static final String GET_ALL_PROCESS_INFO_INSTANCES = "GET_ALL_PROCESS_INFO_INSTANCES";
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/3c011688/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
index df5d6c9..b47766c 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
@@ -137,6 +137,18 @@ public class ExtensionMetaStore {
         }
     }
 
+    public List<ExtensionJobsBean> getJobsForAnExtension(String extensionName) {
+        EntityManager entityManager = getEntityManager();
+        beginTransaction(entityManager);
+        Query query = entityManager.createNamedQuery(PersistenceConstants.GET_JOBS_FOR_AN_EXTENSION);
+        query.setParameter(EXTENSION_NAME, extensionName);
+        try {
+            return (List<ExtensionJobsBean>)query.getResultList();
+        } finally {
+            commitAndCloseTransaction(entityManager);
+        }
+    }
+
     public void deleteExtension(String extensionName){
         EntityManager entityManager = getEntityManager();
         beginTransaction(entityManager);

http://git-wip-us.apache.org/repos/asf/falcon/blob/3c011688/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
index c50d6de..c3b4feb 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
@@ -28,6 +28,7 @@ import org.apache.falcon.extensions.ExtensionType;
 import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.persistence.ExtensionBean;
+import org.apache.falcon.persistence.ExtensionJobsBean;
 import org.apache.falcon.util.StartupProperties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -382,6 +383,17 @@ public final class ExtensionStore {
         return (storePath != null);
     }
 
+    public List<String> getJobsForAnExtension(final String extensionName) throws FalconException {
+        List<ExtensionJobsBean> extensionJobs = metaStore.getJobsForAnExtension(extensionName);
+        List<String> extensionJobNames = new ArrayList<>();
+        if (null != extensionJobs && !extensionJobs.isEmpty()) {
+            for (ExtensionJobsBean extensionJobsBean : extensionJobs) {
+                extensionJobNames.add(extensionJobsBean.getJobName());
+            }
+        }
+        return extensionJobNames;
+    }
+
     public String updateExtensionStatus(final String extensionName, String currentUser, ExtensionStatus status) throws
             FalconException {
         validateStatusChange(extensionName, currentUser);

http://git-wip-us.apache.org/repos/asf/falcon/blob/3c011688/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
index 63974f2..8ada576 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.falcon.resource;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
 import org.apache.falcon.FalconWebException;
@@ -99,13 +100,26 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager {
         }
     }
 
-    public APIResult deleteExtensionMetadata(String extensionName){
+    public APIResult deleteExtensionMetadata(String extensionName) {
         validateExtensionName(extensionName);
+        ExtensionStore metaStore = ExtensionStore.get();
         try {
-            return new APIResult(APIResult.Status.SUCCEEDED, ExtensionStore.get().deleteExtension(extensionName,
-                    CurrentUser.getUser()));
-        } catch (Throwable e) {
-            throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
+            canDeleteExtension(extensionName);
+            return new APIResult(APIResult.Status.SUCCEEDED,
+                    metaStore.deleteExtension(extensionName, CurrentUser.getUser()));
+        } catch (FalconException e) {
+            throw FalconWebException.newAPIException(e);
+        }
+    }
+
+    private void canDeleteExtension(String extensionName) throws FalconException {
+        ExtensionStore metaStore = ExtensionStore.get();
+        List<String> extensionJobs = metaStore.getJobsForAnExtension(extensionName);
+        if (!extensionJobs.isEmpty()) {
+            LOG.error("Extension:{} cannot be unregistered as {} are instances of the extension", extensionName,
+                    ArrayUtils.toString(extensionJobs));
+            throw new FalconException("Extension:" + extensionName + " cannot be unregistered as following instances"
+                    + " are dependent on the extension" + ArrayUtils.toString(extensionJobs));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/3c011688/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
index d76dbca..9358958 100644
--- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
+++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java
@@ -277,7 +277,11 @@ public class FalconUnitClient extends AbstractFalconClient {
 
     @Override
     public APIResult unregisterExtension(String extensionName) {
-        return localExtensionManager.unRegisterExtension(extensionName);
+        try {
+            return localExtensionManager.unRegisterExtension(extensionName);
+        } catch (FalconException e) {
+            throw new FalconCLIException("Failed in unRegistering the extension"+ e.getMessage());
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/falcon/blob/3c011688/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
----------------------------------------------------------------------
diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
index a7303b1..ca39ddb 100644
--- a/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
+++ b/unit/src/main/java/org/apache/falcon/unit/LocalExtensionManager.java
@@ -179,7 +179,7 @@ public class LocalExtensionManager extends AbstractExtensionManager {
         return super.registerExtensionMetadata(extensionName, packagePath, description, CurrentUser.getUser());
     }
 
-    APIResult unRegisterExtension(String extensionName) {
+    APIResult unRegisterExtension(String extensionName) throws FalconException {
         return super.deleteExtensionMetadata(extensionName);
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/3c011688/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
index 0dd09c1..e9367d5 100644
--- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
+++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java
@@ -218,29 +218,29 @@ public class FalconUnitTestBase {
         return props;
     }
 
-    public String registerExtension(String extensionName, String packagePath, String description)
+    APIResult registerExtension(String extensionName, String packagePath, String description)
         throws IOException, FalconException {
 
-        return falconUnitClient.registerExtension(extensionName, packagePath, description).getMessage();
+        return falconUnitClient.registerExtension(extensionName, packagePath, description);
     }
 
-    public String disableExtension(String extensionName) {
+    String disableExtension(String extensionName) {
         return falconUnitClient.disableExtension(extensionName).getMessage();
     }
 
-    public String enableExtension(String extensionName) {
+    String enableExtension(String extensionName) {
         return falconUnitClient.enableExtension(extensionName).getMessage();
     }
 
-    public String getExtensionJobDetails(String jobName) {
-        return falconUnitClient.getExtensionJobDetails(jobName).getMessage();
+    APIResult getExtensionJobDetails(String jobName) {
+        return falconUnitClient.getExtensionJobDetails(jobName);
     }
 
-    public String unregisterExtension(String extensionName) {
-        return falconUnitClient.unregisterExtension(extensionName).getMessage();
+    APIResult unregisterExtension(String extensionName) {
+        return falconUnitClient.unregisterExtension(extensionName);
     }
 
-    public APIResult submitExtensionJob(String extensionName, String jobName, String configPath, String doAsUser) {
+    APIResult submitExtensionJob(String extensionName, String jobName, String configPath, String doAsUser) {
         return falconUnitClient.submitExtensionJob(extensionName, jobName, configPath, doAsUser);
     }
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/3c011688/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
----------------------------------------------------------------------
diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
index 29cfed4..72280f7 100644
--- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
+++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java
@@ -245,7 +245,7 @@ public class TestFalconUnit extends FalconUnitTestBase {
         assertStatus(result);
     }
 
-    public void setDummyProperty(Process process) {
+    private void setDummyProperty(Process process) {
         Property property = new Property();
         property.setName("dummy");
         property.setValue("dummy");
@@ -424,13 +424,12 @@ public class TestFalconUnit extends FalconUnitTestBase {
         clearDB();
         submitCluster();
         createExtensionPackage();
-
-        String result = registerExtension("testExtension", new Path(STORAGE_URL + EXTENSION_PATH).toString()
+        APIResult apiResult = registerExtension("testExtension", new Path(STORAGE_URL + EXTENSION_PATH).toString()
                 , "testExtension");
-        Assert.assertEquals(result, "Extension :testExtension registered successfully.");
-
-        result = unregisterExtension("testExtension");
-        Assert.assertEquals(result, "Deleted extension:testExtension");
+        assertStatus(apiResult);
+        apiResult = unregisterExtension("testExtension");
+        assertStatus(apiResult);
+        Assert.assertEquals(apiResult.getMessage(), "Deleted extension:testExtension");
     }
 
     @Test
@@ -441,8 +440,8 @@ public class TestFalconUnit extends FalconUnitTestBase {
         createDir(PROCESS_APP_PATH);
         fs.copyFromLocalFile(new Path(getAbsolutePath(WORKFLOW)), new Path(PROCESS_APP_PATH, "workflow.xml"));
         String packageBuildLib = new Path(EXTENSION_PATH, "libs/build/").toString();
-        String result = registerExtension(TEST_EXTENSION, STORAGE_URL + EXTENSION_PATH, TEST_EXTENSION);
-        Assert.assertEquals(result, "Extension :testExtension registered successfully.");
+        APIResult apiResult = registerExtension(TEST_EXTENSION, STORAGE_URL + EXTENSION_PATH, TEST_EXTENSION);
+        assertStatus(apiResult);
 
         disableExtension(TEST_EXTENSION);
         createDir(PROCESS_APP_PATH);
@@ -457,10 +456,10 @@ public class TestFalconUnit extends FalconUnitTestBase {
         }
         enableExtension(TEST_EXTENSION);
 
-        APIResult apiResult = submitExtensionJob(TEST_EXTENSION, TEST_JOB, null, null);
+        apiResult = submitExtensionJob(TEST_EXTENSION, TEST_JOB, null, null);
         assertStatus(apiResult);
-        result = getExtensionJobDetails(TEST_JOB);
-        JSONObject resultJson = new JSONObject(result);
+        apiResult = getExtensionJobDetails(TEST_JOB);
+        JSONObject resultJson = new JSONObject(apiResult.getMessage());
         Assert.assertEquals(resultJson.get("extensionName"), TEST_EXTENSION);
         Process process = (Process) getClient().getDefinition(EntityType.PROCESS.toString(), "sample", null);
         Assert.assertEquals(process.getPipelines(), "testPipeline");
@@ -482,7 +481,7 @@ public class TestFalconUnit extends FalconUnitTestBase {
         apiResult = updateExtensionJob(TEST_JOB, getAbsolutePath(EXTENSION_PROPERTIES), null);
         assertStatus(apiResult);
 
-        String processes = new JSONObject(getExtensionJobDetails(TEST_JOB)).get("processes").toString();
+        String processes = new JSONObject(getExtensionJobDetails(TEST_JOB).getMessage()).get("processes").toString();
         Assert.assertEquals(processes, "sample");
         process = (Process) getClient().getDefinition(EntityType.PROCESS.toString(), "sample", null);
         Assert.assertEquals(process.getPipelines(), "testSample");
@@ -491,6 +490,12 @@ public class TestFalconUnit extends FalconUnitTestBase {
         assertStatus(apiResult);
         Assert.assertEquals(apiResult.getMessage(), "RUNNING");
 
+        try {
+            unregisterExtension(TEST_EXTENSION);
+            Assert.fail("Should have thrown a FalconCLIException");
+        } catch (FalconWebException e) {
+            //Do nothing. Exception expected as there are dependent extension jobs and so extension cannot be deleted.
+        }
         apiResult = deleteExtensionJob(TEST_JOB, null);
         assertStatus(apiResult);
         try {
@@ -506,14 +511,10 @@ public class TestFalconUnit extends FalconUnitTestBase {
             Assert.assertEquals(((APIResult) e.getResponse().getEntity()).getMessage(), "Job name not found:testJob");
             //Do nothing. Exception Expected.
         }
+        apiResult = unregisterExtension(TEST_EXTENSION);
+        assertStatus(apiResult);
     }
 
-    @Test
-    public void testExtensionJobSuspendAndResume() throws Exception {
-
-    }
-
-
     private void copyExtensionJar(String destDirPath) throws IOException {
         File dir = new File(new Path(JARS_DIR).toUri().toURL().getPath());
         for (File file : dir.listFiles()) {