You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by pa...@apache.org on 2016/11/25 11:39:44 UTC
falcon git commit: FALCON-2191 Extension Job store changes and rest
api changes for submit and submitAndScheule extension services.
Repository: falcon
Updated Branches:
refs/heads/master 3f5087997 -> 49fa46e29
FALCON-2191 Extension Job store changes and rest api changes for submit and submitAndScheule extension services.
Author: sandeep <sa...@gmail.com>
Reviewers: @pallavi-rao
Closes #307 from sandeepSamudrala/FALCON-2191 and squashes the following commits:
2928bce [sandeep] FALCON-2191. Rebasing the patch
2c14f41 [sandeep] FALCON-2191 Incorporated review comments. modified validate method to check for the entity being process and feed only
5da9141 [sandeep] FALCON-2191 Incorporated review comments
ca45a2e [sandeep] FALCON-2191 Extension Job store changes and rest api changes for submit and submitAndScheule extension services
9487132 [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2191
fd2357b [sandeep] Merge branch 'master' of https://github.com/apache/falcon into FALCON-2190
8aacd75 [sandeep] FALCON-2183 Incorporated review comments
f3d7268 [sandeep] FALCON-2183 Incorporated review comments
11e7b3f [sandeep] FALCON-2183 Extension Builder changes to support new user extensions
250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon
1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon
c065566 [sandeep] reverting last line changes made
1a4dcd2 [sandeep] rebased and resolved the conflicts from master
271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay.
a94d4fe [sandeep] rebasing from master
9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/49fa46e2
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/49fa46e2
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/49fa46e2
Branch: refs/heads/master
Commit: 49fa46e29540a869c3a669f779f662f5543b8d10
Parents: 3f50879
Author: sandeep <sa...@gmail.com>
Authored: Fri Nov 25 17:09:32 2016 +0530
Committer: Pallavi Rao <pa...@inmobi.com>
Committed: Fri Nov 25 17:09:32 2016 +0530
----------------------------------------------------------------------
.../falcon/persistence/ExtensionBean.java | 117 +++++++++++++++
.../falcon/persistence/ExtensionJobsBean.java | 145 +++++++++++++++++++
.../persistence/ExtensionMetadataBean.java | 113 ---------------
.../persistence/PersistenceConstants.java | 9 ++
.../falcon/tools/FalconStateStoreDBCLI.java | 3 +-
.../src/main/resources/META-INF/persistence.xml | 16 +-
.../extensions/jdbc/ExtensionMetaStore.java | 89 +++++++++---
.../falcon/extensions/store/ExtensionStore.java | 47 +++---
.../extensions/jdbc/ExtensionMetaStoreTest.java | 39 +++--
.../extensions/store/ExtensionStoreTest.java | 33 +++--
pom.xml | 6 +
prism/pom.xml | 11 +-
.../resource/extensions/ExtensionManager.java | 115 ++++++++++-----
src/build/findbugs-exclude.xml | 7 +-
14 files changed, 519 insertions(+), 231 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/49fa46e2/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java b/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java
new file mode 100644
index 0000000..2cade5b
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/ExtensionBean.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.persistence;
+
+import org.apache.falcon.extensions.ExtensionType;
+
+import javax.persistence.Basic;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.EnumType;
+import javax.persistence.Enumerated;
+import javax.persistence.Id;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.validation.constraints.NotNull;
+import java.util.Date;
+
+
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+/**
+ * Table to store extensions.
+ */
+
+@Table(name = "EXTENSIONS")
+@Entity
+@NamedQueries({
+ @NamedQuery(name = PersistenceConstants.GET_ALL_EXTENSIONS, query = "select OBJECT(a) from ExtensionBean a "),
+ @NamedQuery(name = PersistenceConstants.DELETE_EXTENSIONS_OF_TYPE, query = "delete from ExtensionBean a where a.extensionType = :extensionType "),
+ @NamedQuery(name = PersistenceConstants.DELETE_EXTENSION, query = "delete from ExtensionBean a where a.extensionName = :extensionName "),
+ @NamedQuery(name = PersistenceConstants.GET_EXTENSION, query = "select OBJECT(a) from ExtensionBean a where a.extensionName = :extensionName")
+})
+//RESUME CHECKSTYLE CHECK LineLengthCheck
+public class ExtensionBean {
+ @Basic
+ @NotNull
+ @Id
+ @Column(name = "extension_name")
+ private String extensionName;
+
+ @Basic
+ @NotNull
+ @Column(name = "extension_type")
+ @Enumerated(EnumType.STRING)
+ private ExtensionType extensionType;
+
+ @Basic
+ @Column(name = "description")
+ private String description;
+
+ @Basic
+ @NotNull
+ @Column(name = "location")
+ private String location;
+
+ @Basic
+ @NotNull
+ @Column(name = "creation_time")
+ private Date creationTime;
+
+ public ExtensionType getExtensionType() {
+ return extensionType;
+ }
+
+ public void setExtensionType(ExtensionType extensionType) {
+ this.extensionType = extensionType;
+ }
+
+ public Date getCreationTime() {
+ return creationTime;
+ }
+
+ public void setCreationTime(Date creationTime) {
+ this.creationTime = creationTime;
+ }
+
+
+ public String getExtensionName() {
+ return extensionName;
+ }
+
+ public void setExtensionName(String extensionName) {
+ this.extensionName = extensionName;
+ }
+
+ public String getLocation() {
+ return location;
+ }
+
+ public void setLocation(String location) {
+ this.location = location;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/49fa46e2/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java b/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java
new file mode 100644
index 0000000..2dc66f8
--- /dev/null
+++ b/common/src/main/java/org/apache/falcon/persistence/ExtensionJobsBean.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.persistence;
+
+
+//SUSPEND CHECKSTYLE CHECK LineLengthCheck
+
+import javax.persistence.Basic;
+import javax.persistence.Column;
+import javax.persistence.Entity;
+import javax.persistence.FetchType;
+import javax.persistence.Id;
+import javax.persistence.Lob;
+import javax.persistence.NamedQueries;
+import javax.persistence.NamedQuery;
+import javax.persistence.Table;
+import javax.validation.constraints.NotNull;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * Table to store extension jobs.
+ */
+
+@Table(name = "EXTENSION_JOBS")
+@Entity
+@NamedQueries({
+ @NamedQuery(name = PersistenceConstants.GET_ALL_EXTENSION_JOBS, query = "select OBJECT(a) from ExtensionJobsBean a "),
+ @NamedQuery(name = PersistenceConstants.DELETE_EXTENSION_JOB, query = "delete from ExtensionJobsBean a where a.jobName = :jobName "),
+ @NamedQuery(name = PersistenceConstants.GET_EXTENSION_JOB, query = "select OBJECT(a) from ExtensionJobsBean a where a.jobName = :jobName")
+})
+//RESUME CHECKSTYLE CHECK LineLengthCheck
+public class ExtensionJobsBean {
+
+ @Basic
+ @NotNull
+ @Id
+ @Column(name = "job_name")
+ private String jobName;
+
+ @Basic
+ @NotNull
+ @Column(name = "extension_name")
+ private String extensionName;
+
+ @Basic
+ @NotNull
+ @Column(name = "feeds")
+ private String[] feeds;
+
+ @Basic
+ @NotNull
+ @Column(name = "processes")
+ private String[] processes;
+
+ @Lob
+ @Basic(fetch= FetchType.LAZY)
+ @Column(name = "config")
+ private byte[] config;
+
+
+ @Basic
+ @NotNull
+ @Column(name = "creation_time")
+ private Date creationTime;
+
+ @Basic
+ @NotNull
+ @Column(name = "last_updated_time")
+ private Date lastUpdatedTime;
+
+ public String getJobName() {
+ return jobName;
+ }
+
+ public void setJobName(String jobName) {
+ this.jobName = jobName;
+ }
+
+ public Date getCreationTime() {
+ return creationTime;
+ }
+
+ public void setCreationTime(Date creationTime) {
+ this.creationTime = creationTime;
+ }
+
+
+ public byte[] getConfig() {
+ return config;
+ }
+
+ public void setConfig(byte[] config) {
+ this.config = config;
+ }
+
+ public String getExtensionName() {
+ return extensionName;
+ }
+
+ public void setExtensionName(String extensionName) {
+ this.extensionName = extensionName;
+ }
+
+ public Date getLastUpdatedTime() {
+ return lastUpdatedTime;
+ }
+
+ public void setLastUpdatedTime(Date lastUpdatedTime) {
+ this.lastUpdatedTime = lastUpdatedTime;
+ }
+
+ public List<String> getFeeds() {
+ return Arrays.asList(feeds);
+ }
+
+ public void setFeeds(List<String> feeds) {
+ this.feeds = feeds.toArray(new String[feeds.size()]);
+ }
+
+ public List<String> getProcesses() {
+ return Arrays.asList(processes);
+ }
+
+ public void setProcesses(List<String> processes) {
+ this.processes = processes.toArray(new String[processes.size()]);
+ }
+}
http://git-wip-us.apache.org/repos/asf/falcon/blob/49fa46e2/common/src/main/java/org/apache/falcon/persistence/ExtensionMetadataBean.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/ExtensionMetadataBean.java b/common/src/main/java/org/apache/falcon/persistence/ExtensionMetadataBean.java
deleted file mode 100644
index 9f4cf72..0000000
--- a/common/src/main/java/org/apache/falcon/persistence/ExtensionMetadataBean.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.falcon.persistence;
-
-import javax.persistence.Basic;
-import javax.persistence.Column;
-import javax.persistence.Entity;
-import javax.persistence.Id;
-import javax.persistence.NamedQueries;
-import javax.persistence.NamedQuery;
-import javax.persistence.Table;
-import javax.validation.constraints.NotNull;
-import java.util.Date;
-
-
-//SUSPEND CHECKSTYLE CHECK LineLengthCheck
-/**
- * Table to store extension metadata.
- */
-
-@Table(name = "EXTENSION_METADATA")
-@Entity
-@NamedQueries({
- @NamedQuery(name = PersistenceConstants.GET_ALL_EXTENSIONS, query = "select OBJECT(a) from ExtensionMetadataBean a "),
- @NamedQuery(name = PersistenceConstants.DELETE_EXTENSIONS_OF_TYPE, query = "delete from ExtensionMetadataBean a where a.extensionType = :extensionType "),
- @NamedQuery(name = PersistenceConstants.DELETE_EXTENSION, query = "delete from ExtensionMetadataBean a where a.extensionName = :extensionName "),
- @NamedQuery(name = PersistenceConstants.GET_EXTENSION, query = "select OBJECT(a) from ExtensionMetadataBean a where a.extensionName = :extensionName")
-})
-//RESUME CHECKSTYLE CHECK LineLengthCheck
-public class ExtensionMetadataBean {
- @Basic
- @NotNull
- @Id
- @Column(name = "extension_name")
- private String extensionName;
-
-
- @Basic
- @NotNull
- @Column(name = "extension_type")
- private String extensionType;
-
- @Basic
- @Column(name = "description")
- private String description;
-
- @Basic
- @NotNull
- @Column(name = "location")
- private String location;
-
-
- @Basic
- @NotNull
- @Column(name = "creation_time")
- private Date creationTime;
-
- public String getExtensionType() {
- return extensionType;
- }
-
- public void setExtensionType(String extensionType) {
- this.extensionType = extensionType;
- }
-
- public Date getCreationTime() {
- return creationTime;
- }
-
- public void setCreationTime(Date creationTime) {
- this.creationTime = creationTime;
- }
-
-
- public String getExtensionName() {
- return extensionName;
- }
-
- public void setExtensionName(String extensionName) {
- this.extensionName = extensionName;
- }
-
- public String getLocation() {
- return location;
- }
-
- public void setLocation(String location) {
- this.location = location;
- }
-
- public String getDescription() {
- return description;
- }
-
- public void setDescription(String description) {
- this.description = description;
- }
-}
http://git-wip-us.apache.org/repos/asf/falcon/blob/49fa46e2/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
index 94eb32e..fc82ae7 100644
--- a/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
+++ b/common/src/main/java/org/apache/falcon/persistence/PersistenceConstants.java
@@ -36,6 +36,7 @@ public final class PersistenceConstants {
public static final String DELETE_ALL_PENDING_INSTANCES_FOR_ENTITY = "DELETE_ALL_PENDING_INSTANCES_FOR_ENTITY";
public static final String GET_DATE_FOR_PENDING_INSTANCES = "GET_DATE_FOR_PENDING_INSTANCES";
public static final String GET_ALL_PENDING_INSTANCES = "GET_ALL_PENDING_INSTANCES";
+
public static final String GET_ENTITY = "GET_ENTITY";
public static final String GET_ENTITY_FOR_STATE = "GET_ENTITY_FOR_STATE";
public static final String UPDATE_ENTITY = "UPDATE_ENTITY";
@@ -57,18 +58,26 @@ public final class PersistenceConstants {
public static final String GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER = "GET_LAST_INSTANCE_FOR_ENTITY_CLUSTER";
public static final String DELETE_INSTANCES_TABLE = "DELETE_INSTANCES_TABLE";
public static final String GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE = "GET_INSTANCE_SUMMARY_BY_STATE_WITH_RANGE";
+
public static final String GET_LATEST_INSTANCE_TIME = "GET_LATEST_INSTANCE_TIME";
public static final String GET_ENTITY_ALERTS = "GET_ENTITY_ALERTS";
public static final String GET_ALL_ENTITY_ALERTS = "GET_ALL_ENTITY_ALERTS";
public static final String GET_SLA_HIGH_CANDIDATES = "GET_SLA_HIGH_CANDIDATES";
public static final String UPDATE_SLA_HIGH = "UPDATE_SLA_HIGH";
+
public static final String GET_ENTITY_ALERT_INSTANCE = "GET_ENTITY_ALERT_INSTANCE";
public static final String DELETE_ENTITY_ALERT_INSTANCE = "DELETE_ENTITY_ALERT_INSTANCE";
public static final String DELETE_BACKLOG_METRIC_INSTANCE = "DELETE_BACKLOG_METRIC_INSTANCE";
public static final String GET_ALL_BACKLOG_INSTANCES = "GET_ALL_BACKLOG_INSTANCES";
public static final String DELETE_ALL_BACKLOG_ENTITY_INSTANCES ="DELETE_ALL_BACKLOG_ENTITY_INSTANCES";
+
public static final String GET_ALL_EXTENSIONS = "GET_ALL_EXTENSIONS";
public static final String DELETE_EXTENSIONS_OF_TYPE = "DELETE_EXTENSIONS_OF_TYPE";
public static final String DELETE_EXTENSION = "DELETE_EXTENSION";
public static final String GET_EXTENSION = "GET_EXTENSION";
+
+ public static final String GET_ALL_EXTENSION_JOBS = "GET_ALL_EXTENSION_JOBS";
+ public static final String DELETE_EXTENSION_JOB = "DELETE_EXTENSION_JOB";
+ public static final String GET_EXTENSION_JOB = "GET_EXTENSION_JOB";
+
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/49fa46e2/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
index e12b982..0c04da3 100644
--- a/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
+++ b/common/src/main/java/org/apache/falcon/tools/FalconStateStoreDBCLI.java
@@ -247,7 +247,8 @@ public class FalconStateStoreDBCLI {
args.add("org.apache.falcon.persistence.MonitoredEntityBean");
args.add("org.apache.falcon.persistence.EntitySLAAlertBean");
args.add("org.apache.falcon.persistence.BacklogMetricBean");
- args.add("org.apache.falcon.persistence.ExtensionMetadataBean");
+ args.add("org.apache.falcon.persistence.ExtensionBean");
+ args.add("org.apache.falcon.persistence.ExtensionJobsBean");
return args.toArray(new String[args.size()]);
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/49fa46e2/common/src/main/resources/META-INF/persistence.xml
----------------------------------------------------------------------
diff --git a/common/src/main/resources/META-INF/persistence.xml b/common/src/main/resources/META-INF/persistence.xml
index 1fbcc9d..0f20103 100644
--- a/common/src/main/resources/META-INF/persistence.xml
+++ b/common/src/main/resources/META-INF/persistence.xml
@@ -29,7 +29,8 @@
<class>org.apache.falcon.persistence.MonitoredEntityBean</class>
<class>org.apache.falcon.persistence.EntitySLAAlertBean</class>
<class>org.apache.falcon.persistence.BacklogMetricBean</class>
- <class>org.apache.falcon.persistence.ExtensionMetadataBean</class>
+ <class>org.apache.falcon.persistence.ExtensionBean</class>
+ <class>org.apache.falcon.persistence.ExtensionJobsBean</class>
<properties>
<property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
@@ -40,7 +41,7 @@
value="jpa(Types=org.apache.falcon.persistence.EntityBean;
org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean;
- org.apache.falcon.persistence.ExtensionMetadataBean)"></property>
+ org.apache.falcon.persistence.ExtensionBean;org.apache.falcon.persistence.ExtensionJobsBean)"/>
<property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
<property name="openjpa.LockManager" value="pessimistic"/>
@@ -64,7 +65,8 @@
<class>org.apache.falcon.persistence.MonitoredEntityBean</class>
<class>org.apache.falcon.persistence.EntitySLAAlertBean</class>
<class>org.apache.falcon.persistence.BacklogMetricBean</class>
- <class>org.apache.falcon.persistence.ExtensionMetadataBean</class>
+ <class>org.apache.falcon.persistence.ExtensionBean</class>
+ <class>org.apache.falcon.persistence.ExtensionJobsBean</class>
<properties>
<property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
@@ -74,7 +76,7 @@
value="jpa(Types=org.apache.falcon.persistence.EntityBean;
org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean;
- org.apache.falcon.persistence.ExtensionMetadataBean)"></property>
+ org.apache.falcon.persistence.ExtensionBean;org.apache.falcon.persistence.ExtensionJobsBean)"/>
<property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
<property name="openjpa.LockManager" value="pessimistic"/>
@@ -97,7 +99,8 @@
<class>org.apache.falcon.persistence.PendingInstanceBean</class>
<class>org.apache.falcon.persistence.EntitySLAAlertBean</class>
<class>org.apache.falcon.persistence.BacklogMetricBean</class>
- <class>org.apache.falcon.persistence.ExtensionMetadataBean</class>
+ <class>org.apache.falcon.persistence.ExtensionBean</class>
+ <class>org.apache.falcon.persistence.ExtensionJobsBean</class>
<properties>
<property name="openjpa.ConnectionDriverName" value="org.apache.commons.dbcp.BasicDataSource"/>
@@ -107,7 +110,8 @@
value="jpa(Types=org.apache.falcon.persistence.EntityBean;
org.apache.falcon.persistence.InstanceBean;org.apache.falcon.persistence.PendingInstanceBean;
org.apache.falcon.persistence.MonitoredEntityBean;org.apache.falcon.persistence.EntitySLAAlertBean;
- org.apache.falcon.persistence.BacklogMetricBean;org.apache.falcon.persistence.ExtensionMetadataBean)"/>
+ org.apache.falcon.persistence.BacklogMetricBean;org.apache.falcon.persistence.ExtensionBean;
+ org.apache.falcon.persistence.ExtensionJobsBean)"/>
<property name="openjpa.DetachState" value="fetch-groups(DetachedStateField=true)"/>
<property name="openjpa.LockManager" value="pessimistic"/>
<property name="openjpa.ReadLockLevel" value="read"/>
http://git-wip-us.apache.org/repos/asf/falcon/blob/49fa46e2/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
index 0a1a0e7..5501146 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStore.java
@@ -18,7 +18,8 @@
package org.apache.falcon.extensions.jdbc;
import org.apache.falcon.extensions.ExtensionType;
-import org.apache.falcon.persistence.ExtensionMetadataBean;
+import org.apache.falcon.persistence.ExtensionBean;
+import org.apache.falcon.persistence.ExtensionJobsBean;
import org.apache.falcon.persistence.PersistenceConstants;
import org.apache.falcon.service.FalconJPAService;
@@ -32,39 +33,43 @@ import java.util.List;
*/
public class ExtensionMetaStore {
+ private static final String EXTENSION_NAME = "extensionName";
+ private static final String JOB_NAME = "jobName";
+ private static final String EXTENSION_TYPE = "extensionType";
+
private EntityManager getEntityManager() {
return FalconJPAService.get().getEntityManager();
}
- public void storeExtensionMetadataBean(String extensionName, String location, ExtensionType extensionType,
- String description){
- ExtensionMetadataBean extensionMetadataBean = new ExtensionMetadataBean();
- extensionMetadataBean.setLocation(location);
- extensionMetadataBean.setExtensionName(extensionName);
- extensionMetadataBean.setExtensionType(extensionType.toString());
- extensionMetadataBean.setCreationTime(new Date(System.currentTimeMillis()));
- extensionMetadataBean.setDescription(description);
+ public void storeExtensionBean(String extensionName, String location, ExtensionType extensionType,
+ String description){
+ ExtensionBean extensionBean = new ExtensionBean();
+ extensionBean.setLocation(location);
+ extensionBean.setExtensionName(extensionName);
+ extensionBean.setExtensionType(extensionType);
+ extensionBean.setCreationTime(new Date(System.currentTimeMillis()));
+ extensionBean.setDescription(description);
EntityManager entityManager = getEntityManager();
try {
beginTransaction(entityManager);
- entityManager.persist(extensionMetadataBean);
+ entityManager.persist(extensionBean);
} finally {
commitAndCloseTransaction(entityManager);
}
}
- public Boolean checkIfExtensionExists(String extensionName){
+ public Boolean checkIfExtensionExists(String extensionName) {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_EXTENSION);
- q.setParameter("extensionName", extensionName);
+ q.setParameter(EXTENSION_NAME, extensionName);
if (q.getResultList().size() > 0){
return true;
}
return false;
}
- public List<ExtensionMetadataBean> getAllExtensions(){
+ public List<ExtensionBean> getAllExtensions() {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_EXTENSIONS);
@@ -75,11 +80,11 @@ public class ExtensionMetaStore {
}
}
- public void deleteExtensionsOfType(ExtensionType extensionType){
+ public void deleteExtensionsOfType(ExtensionType extensionType) {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_EXTENSIONS_OF_TYPE);
- q.setParameter("extensionType", extensionType.toString());
+ q.setParameter(EXTENSION_TYPE, extensionType);
try{
q.executeUpdate();
} finally {
@@ -87,23 +92,23 @@ public class ExtensionMetaStore {
}
}
- public ExtensionMetadataBean getDetail(String extensionName){
+ public ExtensionBean getDetail(String extensionName) {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.GET_EXTENSION);
- q.setParameter("extensionName", extensionName);
+ q.setParameter(EXTENSION_NAME, extensionName);
try {
- return (ExtensionMetadataBean)q.getSingleResult();
+ return (ExtensionBean)q.getSingleResult();
} finally {
commitAndCloseTransaction(entityManager);
}
}
- public void deleteExtensionMetadata(String extensionName){
+ public void deleteExtension(String extensionName){
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_EXTENSION);
- q.setParameter("extensionName", extensionName);
+ q.setParameter(EXTENSION_NAME, extensionName);
try{
q.executeUpdate();
} finally {
@@ -111,6 +116,50 @@ public class ExtensionMetaStore {
}
}
+ public void storeExtensionJob(String jobName, String extensionName, List<String> feeds, List<String> processes,
+ byte[] config) {
+ ExtensionJobsBean extensionJobsBean = new ExtensionJobsBean();
+ Date currentTime = new Date(System.currentTimeMillis());
+ extensionJobsBean.setJobName(jobName);
+ extensionJobsBean.setExtensionName(extensionName);
+ extensionJobsBean.setCreationTime(currentTime);
+ extensionJobsBean.setFeeds(feeds);
+ extensionJobsBean.setProcesses(processes);
+ extensionJobsBean.setConfig(config);
+ extensionJobsBean.setLastUpdatedTime(currentTime);
+ EntityManager entityManager = getEntityManager();
+ try {
+ beginTransaction(entityManager);
+ entityManager.persist(extensionJobsBean);
+ } finally {
+ commitAndCloseTransaction(entityManager);
+ }
+ }
+
+ public void deleteExtensionJob(String jobName) {
+ EntityManager entityManager = getEntityManager();
+ beginTransaction(entityManager);
+ Query query = entityManager.createNamedQuery(PersistenceConstants.DELETE_EXTENSION_JOB);
+ query.setParameter(JOB_NAME, jobName);
+ try{
+ query.executeUpdate();
+ } finally {
+ commitAndCloseTransaction(entityManager);
+ }
+ }
+
+ public List<ExtensionJobsBean> getAllExtensionJobs() {
+ EntityManager entityManager = getEntityManager();
+ beginTransaction(entityManager);
+ Query q = entityManager.createNamedQuery(PersistenceConstants.GET_ALL_EXTENSION_JOBS);
+ try {
+ return q.getResultList();
+ } finally {
+ commitAndCloseTransaction(entityManager);
+ }
+ }
+
+
private void beginTransaction(EntityManager entityManager) {
entityManager.getTransaction().begin();
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/49fa46e2/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
----------------------------------------------------------------------
diff --git a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
index e15919f..832d5b7 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
@@ -20,38 +20,35 @@ package org.apache.falcon.extensions.store;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.parser.ValidationException;
+import org.apache.falcon.entity.store.StoreAccessException;
import org.apache.falcon.extensions.AbstractExtension;
import org.apache.falcon.extensions.ExtensionType;
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
import org.apache.falcon.hadoop.HadoopClientFactory;
-import org.apache.falcon.entity.parser.ValidationException;
+import org.apache.falcon.util.StartupProperties;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
-
-import org.apache.falcon.util.StartupProperties;
-import org.apache.falcon.entity.store.StoreAccessException;
-
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.HashMap;
-
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Store for Falcon extensions.
@@ -106,10 +103,10 @@ public final class ExtensionStore {
String description = getShortDescription(extension);
String recipeName = extension;
String location = storePath.toString() + '/' + extension;
- metaStore.storeExtensionMetadataBean(recipeName, location, extensionType, description);
+ metaStore.storeExtensionBean(recipeName, location, extensionType, description);
}
} catch (FalconException e){
- LOG.error("Exception in ExtensionStore:", e);
+ LOG.error("Exception in ExtensionMetaStore:", e);
throw new RuntimeException(e);
}
@@ -239,36 +236,36 @@ public final class ExtensionStore {
}
public List<String> getExtensions() throws StoreAccessException {
- List<String> extesnionList = new ArrayList<>();
+ List<String> extensionList = new ArrayList<>();
try {
FileStatus[] fileStatuses = fs.listStatus(storePath);
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
Path filePath = Path.getPathWithoutSchemeAndAuthority(fileStatus.getPath());
- extesnionList.add(filePath.getName());
+ extensionList.add(filePath.getName());
}
}
} catch (IOException e) {
throw new StoreAccessException(e);
}
- return extesnionList;
+ return extensionList;
}
- public String deleteExtensionMetadata(final String extensionName) throws ValidationException{
+ public String deleteExtension(final String extensionName) throws ValidationException{
ExtensionType extensionType = AbstractExtension.isExtensionTrusted(extensionName) ? ExtensionType.TRUSTED
: ExtensionType.CUSTOM;
if (extensionType.equals(ExtensionType.TRUSTED)){
throw new ValidationException(extensionName + " is trusted cannot be deleted.");
}
if (metaStore.checkIfExtensionExists(extensionName)) {
- metaStore.deleteExtensionMetadata(extensionName);
+ metaStore.deleteExtension(extensionName);
return "Deleted extension:" + extensionName;
}else {
return "Extension:" + extensionName + " is not registered with Falcon.";
}
}
- public String registerExtensionMetadata(final String extensionName, final String path, final String description)
+ public String registerExtension(final String extensionName, final String path, final String description)
throws URISyntaxException, FalconException {
Configuration conf = new Configuration();
URI uri = new URI(path);
@@ -277,7 +274,7 @@ public final class ExtensionStore {
try {
fileSystem.listStatus(new Path(uri.getPath() + "/README"));
} catch (IOException e){
- LOG.error("Exception in registerExtensionMetadata:", e);
+ LOG.error("Exception in registering Extension:{}", extensionName, e);
throw new ValidationException("README file is not present in the " + path);
}
PathFilter filter=new PathFilter(){
@@ -292,7 +289,7 @@ public final class ExtensionStore {
throw new ValidationException("Jars are not present in the " + uri.getPath() + "libs/build.");
}
} catch (IOException e){
- LOG.error("Exception in registerExtensionMetadata:", e);
+ LOG.error("Exception in registering Extension:{}", extensionName, e);
throw new ValidationException("Jars are not present in the " + uri.getPath() + "libs/build.");
}
FileStatus[] propStatus;
@@ -303,13 +300,13 @@ public final class ExtensionStore {
+ " structure.");
}
} catch (IOException e){
- LOG.error("Exception in registerExtensionMetadata:", e);
+ LOG.error("Exception in registering Extension:{}", extensionName, e);
throw new ValidationException("Directory is not present in the " + uri.getPath() + "/META"
+ " structure.");
}
if (!metaStore.checkIfExtensionExists(extensionName)){
- metaStore.storeExtensionMetadataBean(extensionName, path, ExtensionType.CUSTOM, description);
+ metaStore.storeExtensionBean(extensionName, path, ExtensionType.CUSTOM, description);
}else{
throw new ValidationException(extensionName + " already exsists.");
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/49fa46e2/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java
----------------------------------------------------------------------
diff --git a/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java b/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java
index d0f1c0c..d96fc1f 100644
--- a/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java
+++ b/extensions/src/test/java/org/apache/falcon/extensions/jdbc/ExtensionMetaStoreTest.java
@@ -20,7 +20,7 @@ package org.apache.falcon.extensions.jdbc;
import org.apache.falcon.cluster.util.EmbeddedCluster;
import org.apache.falcon.extensions.ExtensionType;
import org.apache.falcon.extensions.store.AbstractTestExtensionStore;
-import org.apache.falcon.persistence.ExtensionMetadataBean;
+import org.apache.falcon.persistence.ExtensionBean;
import org.apache.falcon.service.FalconJPAService;
import org.apache.hadoop.conf.Configuration;
@@ -29,9 +29,10 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-
import javax.persistence.EntityManager;
import javax.persistence.Query;
+import java.util.ArrayList;
+import java.util.List;
/**
* Test Cases for ExtensionMetaStore.
@@ -57,28 +58,46 @@ public class ExtensionMetaStoreTest extends AbstractTestExtensionStore {
}
@Test
- public void dbOpertaions(){
+ public void testExtension(){
//insert
- stateStore.storeExtensionMetadataBean("test1", "test_location", ExtensionType.TRUSTED, "test_description");
+ stateStore.storeExtensionBean("test1", "test_location", ExtensionType.TRUSTED, "test_description");
Assert.assertEquals(stateStore.getAllExtensions().size(), 1);
//check data
- ExtensionMetadataBean bean = stateStore.getDetail("test1");
+ ExtensionBean bean = stateStore.getDetail("test1");
Assert.assertEquals(bean.getLocation(), "test_location");
//delete
stateStore.deleteExtensionsOfType(ExtensionType.TRUSTED);
Assert.assertEquals(stateStore.getAllExtensions().size(), 0);
}
+ @Test
+ public void testExtensionJob() {
+ stateStore.storeExtensionBean("test2", "test_location", ExtensionType.CUSTOM, "test2_description");
+ List<String> processes = new ArrayList<>();
+ processes.add("testProcess");
+ List<String> feeds = new ArrayList<>();
+ feeds.add("testFeed");
+
+ byte[] config = new byte[0];
+ stateStore.storeExtensionJob("job1", "test2", feeds, processes, config);
+
+ Assert.assertEquals(stateStore.getAllExtensionJobs().size(), 1);
+ stateStore.deleteExtensionJob("job1");
+ Assert.assertEquals(stateStore.getAllExtensionJobs().size(), 0);
+ }
+
private void clear() {
- EntityManager em = FalconJPAService.get().getEntityManager();
- em.getTransaction().begin();
+ EntityManager entityManager = FalconJPAService.get().getEntityManager();
+ entityManager.getTransaction().begin();
try {
- Query query = em.createNativeQuery("delete from EXTENSION_METADATA");
+ Query query = entityManager.createNativeQuery("delete from EXTENSIONS");
+ query.executeUpdate();
+ query = entityManager.createNativeQuery("delete from EXTENSION_JOBS");
query.executeUpdate();
} finally {
- em.getTransaction().commit();
- em.close();
+ entityManager.getTransaction().commit();
+ entityManager.close();
}
}
}
http://git-wip-us.apache.org/repos/asf/falcon/blob/49fa46e2/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
----------------------------------------------------------------------
diff --git a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
index 27bea53..50c9b7f 100644
--- a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
+++ b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
@@ -19,14 +19,6 @@
package org.apache.falcon.extensions.store;
import com.google.common.collect.ImmutableMap;
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.net.URISyntaxException;
-import java.util.Map;
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.parser.ValidationException;
import org.apache.falcon.entity.store.StoreAccessException;
@@ -42,6 +34,15 @@ import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.net.URISyntaxException;
+import java.util.Map;
+
/**
* Tests for extension store.
*/
@@ -105,21 +106,21 @@ public class ExtensionStoreTest extends AbstractTestExtensionStore {
@Test
- public void testRegisterExtensionMetadata() throws IOException, URISyntaxException, FalconException{
- createlibs();
+ public void testRegisterExtension() throws IOException, URISyntaxException, FalconException{
+ createLibs();
createReadmeAndJar();
createMETA();
store = ExtensionStore.get();
- store.registerExtensionMetadata("test", STORAGE_URL + EXTENSION_PATH, "test desc");
+ store.registerExtension("test", STORAGE_URL + EXTENSION_PATH, "test desc");
ExtensionMetaStore metaStore = new ExtensionMetaStore();
Assert.assertEquals(metaStore.getAllExtensions().size(), 1);
}
@Test(expectedExceptions=ValidationException.class)
- public void testFailureCaseRegisterExtensionMetadata() throws IOException, URISyntaxException, FalconException{
+ public void testFailureCaseRegisterExtension() throws IOException, URISyntaxException, FalconException{
store = ExtensionStore.get();
- createlibs();
- store.registerExtensionMetadata("test", STORAGE_URL + EXTENSION_PATH, "test desc");
+ createLibs();
+ store.registerExtension("test", STORAGE_URL + EXTENSION_PATH, "test desc");
}
private void createMETA() throws IOException{
@@ -140,7 +141,7 @@ public class ExtensionStoreTest extends AbstractTestExtensionStore {
br.close();
}
- private void createlibs() throws IOException{
+ private void createLibs() throws IOException{
Path path = new Path(EXTENSION_PATH);
if (fs.exists(path)){
fs.delete(path, true);
@@ -169,7 +170,7 @@ public class ExtensionStoreTest extends AbstractTestExtensionStore {
EntityManager em = FalconJPAService.get().getEntityManager();
em.getTransaction().begin();
try {
- Query query = em.createNativeQuery("delete from EXTENSION_METADATA");
+ Query query = em.createNativeQuery("delete from EXTENSIONS");
query.executeUpdate();
} finally {
em.getTransaction().commit();
http://git-wip-us.apache.org/repos/asf/falcon/blob/49fa46e2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6a15987..95a101b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -699,6 +699,12 @@
</dependency>
<dependency>
+ <groupId>com.sun.jersey.contribs</groupId>
+ <artifactId>jersey-multipart</artifactId>
+ <version>${jersey.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.8.3</version>
http://git-wip-us.apache.org/repos/asf/falcon/blob/49fa46e2/prism/pom.xml
----------------------------------------------------------------------
diff --git a/prism/pom.xml b/prism/pom.xml
index 11f3944..57cdfe7 100644
--- a/prism/pom.xml
+++ b/prism/pom.xml
@@ -88,12 +88,6 @@
<dependency>
<groupId>org.apache.falcon</groupId>
- <artifactId>falcon-common-types</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.falcon</groupId>
<artifactId>falcon-oozie-adaptor</artifactId>
</dependency>
@@ -130,6 +124,11 @@
</dependency>
<dependency>
+ <groupId>com.sun.jersey.contribs</groupId>
+ <artifactId>jersey-multipart</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/falcon/blob/49fa46e2/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
index 6f2974d..cd1d4e2 100644
--- a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
@@ -18,6 +18,8 @@
package org.apache.falcon.resource.extensions;
+import com.sun.jersey.multipart.FormDataParam;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
@@ -25,12 +27,14 @@ import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.parser.ValidationException;
import org.apache.falcon.entity.store.StoreAccessException;
import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.extensions.Extension;
import org.apache.falcon.extensions.ExtensionProperties;
import org.apache.falcon.extensions.ExtensionService;
+import org.apache.falcon.extensions.ExtensionType;
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
import org.apache.falcon.extensions.store.ExtensionStore;
-import org.apache.falcon.persistence.ExtensionMetadataBean;
+import org.apache.falcon.persistence.ExtensionBean;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.AbstractSchedulableEntityManager;
import org.apache.falcon.resource.EntityList;
@@ -58,6 +62,7 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import java.io.IOException;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -76,7 +81,6 @@ public class ExtensionManager extends AbstractSchedulableEntityManager {
public static final String TAG_PREFIX_EXTENSION_NAME = "_falcon_extension_name=";
public static final String TAG_PREFIX_EXTENSION_JOB = "_falcon_extension_job=";
- public static final String TAG_SEPARATOR = ",";
public static final String ASCENDING_SORT_ORDER = "asc";
public static final String DESCENDING_SORT_ORDER = "desc";
@@ -290,18 +294,19 @@ public class ExtensionManager extends AbstractSchedulableEntityManager {
@POST
@Path("submit/{extension-name}")
- @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+ @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.MULTIPART_FORM_DATA})
@Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
public APIResult submit(
@PathParam("extension-name") String extensionName,
@Context HttpServletRequest request,
- @DefaultValue("") @QueryParam("doAs") String doAsUser) {
+ @DefaultValue("") @QueryParam("doAs") String doAsUser,
+ @QueryParam("jobName") String jobName,
+ @FormDataParam("entities") List<Entity> entities,
+ @FormDataParam("config") InputStream config) {
checkIfExtensionServiceIsEnabled();
try {
- List<Entity> entities = generateEntities(extensionName, request);
- for (Entity entity : entities) {
- submitInternal(entity, doAsUser);
- }
+ entities = getEntityList(extensionName, entities, config);
+ submitEntities(extensionName, doAsUser, jobName, entities, config);
} catch (FalconException | IOException e) {
LOG.error("Error when submitting extension job: ", e);
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
@@ -309,20 +314,37 @@ public class ExtensionManager extends AbstractSchedulableEntityManager {
return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted successfully");
}
+ private void validateEntities(List<Entity> entities) throws FalconException {
+ for (Entity entity : entities) {
+ if (!EntityType.FEED.equals(entity.getEntityType()) && !EntityType.FEED.equals(entity.getEntityType())) {
+ LOG.error("Cluster entity is not allowed for submission via submitEntities: {}", entity.getName());
+ throw new FalconException("Cluster entity is not allowed for submission in extensions submission");
+ }
+ super.validate(entity);
+ }
+ }
+
+ private ExtensionType getExtensionType(String extensionName) {
+ ExtensionMetaStore metaStore = ExtensionStore.get().getMetaStore();
+ ExtensionBean extensionDetails = metaStore.getDetail(extensionName);
+ return extensionDetails.getExtensionType();
+ }
+
@POST
@Path("submitAndSchedule/{extension-name}")
- @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+ @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.MULTIPART_FORM_DATA})
@Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON})
public APIResult submitAndSchedule(
@PathParam("extension-name") String extensionName,
@Context HttpServletRequest request,
- @DefaultValue("") @QueryParam("doAs") String doAsUser) {
+ @DefaultValue("") @QueryParam("doAs") String doAsUser,
+ @QueryParam("jobName") String jobName,
+ @FormDataParam("entities") List<Entity> entities,
+ @FormDataParam("config") InputStream config) {
checkIfExtensionServiceIsEnabled();
try {
- List<Entity> entities = generateEntities(extensionName, request);
- for (Entity entity : entities) {
- submitInternal(entity, doAsUser);
- }
+ entities = getEntityList(extensionName, entities, config);
+ submitEntities(extensionName, doAsUser, jobName, entities, config);
for (Entity entity : entities) {
scheduleInternal(entity.getEntityType().name(), entity.getName(), null, null);
}
@@ -333,6 +355,33 @@ public class ExtensionManager extends AbstractSchedulableEntityManager {
return new APIResult(APIResult.Status.SUCCEEDED, "Extension job submitted and scheduled successfully");
}
+ private void submitEntities(String extensionName, String doAsUser, String jobName, List<Entity> entities,
+ InputStream configStream) throws FalconException, IOException {
+ validateEntities(entities);
+ List<String> feeds = new ArrayList<>();
+ List<String> processes = new ArrayList<>();
+ for (Entity entity : entities) {
+ submitInternal(entity, doAsUser);
+ if (EntityType.FEED.equals(entity.getEntityType())) {
+ feeds.add(entity.getName());
+ } else if (EntityType.PROCESS.equals(entity.getEntityType())) {
+ processes.add(entity.getName());
+ }
+ }
+ ExtensionMetaStore metaStore = ExtensionStore.get().getMetaStore();
+ byte[] configBytes = IOUtils.toByteArray(configStream);
+ metaStore.storeExtensionJob(jobName, extensionName, feeds, processes, configBytes);
+ }
+
+ private List<Entity> getEntityList(String extensionName, List<Entity> entities, InputStream config)
+ throws FalconException, IOException {
+ ExtensionType extensionType = getExtensionType(extensionName);
+ if (ExtensionType.TRUSTED.equals(extensionType)) {
+ entities = generateEntities(extensionName, config);
+ }
+ return entities;
+ }
+
@POST
@Path("update/{extension-name}")
@Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
@@ -343,7 +392,7 @@ public class ExtensionManager extends AbstractSchedulableEntityManager {
@DefaultValue("") @QueryParam("doAs") String doAsUser) {
checkIfExtensionServiceIsEnabled();
try {
- List<Entity> entities = generateEntities(extensionName, request);
+ List<Entity> entities = generateEntities(extensionName, request.getInputStream());
for (Entity entity : entities) {
super.update(entity, entity.getEntityType().name(), entity.getName(), null);
}
@@ -364,7 +413,7 @@ public class ExtensionManager extends AbstractSchedulableEntityManager {
@DefaultValue("") @QueryParam("doAs") String doAsUser) {
checkIfExtensionServiceIsEnabled();
try {
- List<Entity> entities = generateEntities(extensionName, request);
+ List<Entity> entities = generateEntities(extensionName, request.getInputStream());
for (Entity entity : entities) {
super.validate(entity);
}
@@ -385,8 +434,7 @@ public class ExtensionManager extends AbstractSchedulableEntityManager {
JSONArray results;
try {
- List<String> extensions = ExtensionStore.get().getExtensions();
- results = buildEnumerateResult(extensions);
+ results = buildEnumerateResult();
} catch (StoreAccessException e) {
LOG.error("Failed when accessing extension store.", e);
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
@@ -442,7 +490,7 @@ public class ExtensionManager extends AbstractSchedulableEntityManager {
checkIfExtensionServiceIsEnabled();
validateExtensionName(extensionName);
try {
- return ExtensionStore.get().deleteExtensionMetadata(extensionName);
+ return ExtensionStore.get().deleteExtension(extensionName);
} catch (Throwable e) {
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
@@ -454,11 +502,12 @@ public class ExtensionManager extends AbstractSchedulableEntityManager {
@Produces(MediaType.TEXT_PLAIN)
public String registerExtensionMetadata(
@PathParam("extension-name") String extensionName,
- @QueryParam("path") String path, @QueryParam("description") String description){
+ @QueryParam("path") String path,
+ @QueryParam("description") String description){
checkIfExtensionServiceIsEnabled();
validateExtensionName(extensionName);
try {
- return ExtensionStore.get().registerExtensionMetadata(extensionName, path, description);
+ return ExtensionStore.get().registerExtension(extensionName, path, description);
} catch (Throwable e) {
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
@@ -486,18 +535,18 @@ public class ExtensionManager extends AbstractSchedulableEntityManager {
}
}
- private static JSONArray buildEnumerateResult(final List<String> extensions) throws FalconException {
+ private static JSONArray buildEnumerateResult() throws FalconException {
JSONArray results = new JSONArray();
- ExtensionMetaStore metricStore = ExtensionStore.get().getMetaStore();
- List<ExtensionMetadataBean> beanList = metricStore.getAllExtensions();
- for (ExtensionMetadataBean bean : beanList) {
+ ExtensionMetaStore metaStore = ExtensionStore.get().getMetaStore();
+ List<ExtensionBean> extensionBeanList = metaStore.getAllExtensions();
+ for (ExtensionBean extensionBean : extensionBeanList) {
JSONObject resultObject = new JSONObject();
try {
- resultObject.put(EXTENSION_NAME, bean.getExtensionName().toLowerCase());
- resultObject.put(EXTENSION_TYPE, bean.getExtensionType());
- resultObject.put(EXTENSION_DESC, bean.getDescription());
- resultObject.put(EXTENSION_LOCATION, bean.getLocation());
+ resultObject.put(EXTENSION_NAME, extensionBean.getExtensionName().toLowerCase());
+ resultObject.put(EXTENSION_TYPE, extensionBean.getExtensionType());
+ resultObject.put(EXTENSION_DESC, extensionBean.getDescription());
+ resultObject.put(EXTENSION_LOCATION, extensionBean.getLocation());
} catch (JSONException e) {
throw new FalconException(e);
}
@@ -507,12 +556,12 @@ public class ExtensionManager extends AbstractSchedulableEntityManager {
return results;
}
- private List<Entity> generateEntities(String extensionName, HttpServletRequest request)
+ private List<Entity> generateEntities(String extensionName, InputStream configStream)
throws FalconException, IOException {
// get entities for extension job
Properties properties = new Properties();
- properties.load(request.getInputStream());
- List<Entity> entities = extension.getEntities(extensionName, request.getInputStream());
+ properties.load(configStream);
+ List<Entity> entities = extension.getEntities(extensionName, configStream);
// add tags on extension name and job
String jobName = properties.getProperty(ExtensionProperties.JOB_NAME.getName());
@@ -528,7 +577,7 @@ public class ExtensionManager extends AbstractSchedulableEntityManager {
throw new ValidationException("No extension resources found for " + extensionName);
}
- ExtensionMetadataBean bean = metaStore.getDetail(extensionName);
+ ExtensionBean bean = metaStore.getDetail(extensionName);
JSONObject resultObject = new JSONObject();
try {
resultObject.put(EXTENSION_NAME, bean.getExtensionName());
http://git-wip-us.apache.org/repos/asf/falcon/blob/49fa46e2/src/build/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/src/build/findbugs-exclude.xml b/src/build/findbugs-exclude.xml
index d910c96..04e267f 100644
--- a/src/build/findbugs-exclude.xml
+++ b/src/build/findbugs-exclude.xml
@@ -70,7 +70,12 @@
</Match>
<Match>
- <Class name="org.apache.falcon.persistence.ExtensionMetadataBean" />
+ <Class name="org.apache.falcon.persistence.ExtensionBean" />
+ <Bug pattern="NP_BOOLEAN_RETURN_NULL,UWF_UNWRITTEN_FIELD" />
+ </Match>
+
+ <Match>
+ <Class name="org.apache.falcon.persistence.ExtensionJobsBean" />
<Bug pattern="NP_BOOLEAN_RETURN_NULL,UWF_UNWRITTEN_FIELD" />
</Match>