You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by ve...@apache.org on 2014/08/20 01:37:04 UTC
[1/2] FALCON-470 Add support for pagination, filter-by,
etc. to Entity and Instance List API. Contributed by Balu Vellanki
Repository: incubator-falcon
Updated Branches:
refs/heads/master 85427981a -> 62b2af025
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/62b2af02/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index a0dbe65..7b8bdf5 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -296,8 +296,13 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
@Produces({MediaType.TEXT_XML, MediaType.APPLICATION_JSON})
@Override
public EntityList getEntityList(@PathParam("type") String type,
- @DefaultValue("") @QueryParam("fields") String fields) {
- return super.getEntityList(type, fields);
+ @DefaultValue("") @QueryParam("fields") String fields,
+ @DefaultValue("") @QueryParam("filterBy") String filterBy,
+ @DefaultValue("") @QueryParam("tags") String tags,
+ @DefaultValue("") @QueryParam("orderBy") String orderBy,
+ @DefaultValue("0") @QueryParam("offset") Integer offset,
+ @DefaultValue("-1") @QueryParam("numResults") Integer resultsPerPage) {
+ return super.getEntityList(type, fields, filterBy, tags, orderBy, offset, resultsPerPage);
}
@GET
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/62b2af02/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java b/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
index 73cf493..9e2d87b 100644
--- a/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
+++ b/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
@@ -18,7 +18,12 @@
package org.apache.falcon.resource;
import org.apache.falcon.FalconWebException;
+import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.process.ACL;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.security.CurrentUser;
+import org.apache.falcon.util.StartupProperties;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
@@ -40,7 +45,6 @@ public class EntityManagerTest extends AbstractEntityManager {
@Mock
private HttpServletRequest mockHttpServletRequest;
-
private static final String SAMPLE_PROCESS_XML = "/process-version-0.xml";
private static final String SAMPLE_INVALID_PROCESS_XML = "/process-invalid.xml";
@@ -62,12 +66,11 @@ public class EntityManagerTest extends AbstractEntityManager {
/**
* Run this testcase for different types of VALID entity xmls like process, feed, dataEndPoint.
*
- * @param stream
+ * @param stream entity stream
* @throws IOException
*/
@Test(dataProvider = "validXMLServletStreamProvider")
- public void testValidateForValidEntityXML(EntityType entityType,
- ServletInputStream stream) throws IOException {
+ public void testValidateForValidEntityXML(ServletInputStream stream) throws IOException {
when(mockHttpServletRequest.getInputStream()).thenReturn(stream);
}
@@ -102,10 +105,105 @@ public class EntityManagerTest extends AbstractEntityManager {
}
}
+ @Test
+ public void testGetEntityList() throws Exception {
+
+ Entity process1 = buildProcess("processFakeUser", "fakeUser", "", "");
+ configStore.publish(EntityType.PROCESS, process1);
+
+ Entity process2 = buildProcess("processAuthUser", System.getProperty("user.name"), "", "");
+ configStore.publish(EntityType.PROCESS, process2);
+
+ EntityList entityList = this.getEntityList("process", "", "", "", "", 0, -1);
+ Assert.assertNotNull(entityList.getElements());
+ Assert.assertEquals(entityList.getElements().length, 2);
+
+ /*
+ * Both entities should be returned when the user is SuperUser.
+ */
+ StartupProperties.get().setProperty("falcon.security.authorization.enabled", "true");
+ CurrentUser.authenticate(System.getProperty("user.name"));
+ entityList = this.getEntityList("process", "", "", "", "", 0, -1);
+ Assert.assertNotNull(entityList.getElements());
+ Assert.assertEquals(entityList.getElements().length, 2);
+
+ /*
+ * Only one entity should be returned when the auth is enabled.
+ */
+ CurrentUser.authenticate("fakeUser");
+ entityList = this.getEntityList("process", "", "", "", "", 0, -1);
+ Assert.assertNotNull(entityList.getElements());
+ Assert.assertEquals(entityList.getElements().length, 1);
+
+ // reset values
+ StartupProperties.get().setProperty("falcon.security.authorization.enabled", "false");
+ CurrentUser.authenticate(System.getProperty("user.name"));
+ }
+
+ @Test
+ public void testGetEntityListPagination() throws Exception {
+ String user = System.getProperty("user.name");
+
+ Entity process1 = buildProcess("process1", user,
+ "consumer=consumer@xyz.com, owner=producer@xyz.com",
+ "testPipeline,dataReplicationPipeline");
+ configStore.publish(EntityType.PROCESS, process1);
+
+ Entity process2 = buildProcess("process2", user,
+ "consumer=consumer@xyz.com, owner=producer@xyz.com",
+ "testPipeline,dataReplicationPipeline");
+ configStore.publish(EntityType.PROCESS, process2);
+
+ Entity process3 = buildProcess("process3", user, "consumer=consumer@xyz.com", "testPipeline");
+ configStore.publish(EntityType.PROCESS, process3);
+
+ Entity process4 = buildProcess("process4", user, "owner=producer@xyz.com", "dataReplicationPipeline");
+ configStore.publish(EntityType.PROCESS, process4);
+
+ EntityList entityList = this.getEntityList("process", "tags", "PIPELINES:dataReplicationPipeline",
+ "", "name", 1, 2);
+ Assert.assertNotNull(entityList.getElements());
+ Assert.assertEquals(entityList.getElements().length, 2);
+ Assert.assertEquals(entityList.getElements()[1].name, "process4");
+ Assert.assertEquals(entityList.getElements()[1].tag.size(), 1);
+ Assert.assertEquals(entityList.getElements()[1].tag.get(0), "owner=producer@xyz.com");
+ Assert.assertEquals(entityList.getElements()[0].status, null);
+
+
+ entityList = this.getEntityList("process", "pipelines", "",
+ "consumer=consumer@xyz.com, owner=producer@xyz.com", "name", 0, 2);
+ Assert.assertNotNull(entityList.getElements());
+ Assert.assertEquals(entityList.getElements().length, 2);
+ Assert.assertEquals(entityList.getElements()[1].name, "process2");
+ Assert.assertEquals(entityList.getElements()[1].pipelines.size(), 2);
+ Assert.assertEquals(entityList.getElements()[1].pipelines.get(0), "testPipeline");
+ Assert.assertEquals(entityList.getElements()[0].tag, null);
+
+ entityList = this.getEntityList("process", "pipelines", "",
+ "consumer=consumer@xyz.com, owner=producer@xyz.com", "name", 10, 2);
+ Assert.assertEquals(entityList.getElements().length, 0);
+ }
+
+ private Entity buildProcess(String name, String username, String tags, String pipelines) {
+ ACL acl = new ACL();
+ acl.setOwner(username);
+ acl.setGroup("hdfs");
+ acl.setPermission("*");
+
+ Process p = new Process();
+ p.setName(name);
+ p.setACL(acl);
+ p.setPipelines(pipelines);
+ p.setTags(tags);
+ return p;
+ }
+
+
+
/**
* Converts a InputStream into ServletInputStream.
*
- * @param resourceName
+ * @param resourceName resource name
* @return ServletInputStream
*/
private ServletInputStream getServletInputStream(String resourceName) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/62b2af02/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
index cdf703d..8762cc1 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/InstanceManager.java
@@ -34,6 +34,7 @@ import java.util.List;
@Path("instance")
public class InstanceManager extends AbstractInstanceManager {
+ //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
@GET
@Path("running/{type}/{entity}")
@Produces(MediaType.APPLICATION_JSON)
@@ -43,8 +44,36 @@ public class InstanceManager extends AbstractInstanceManager {
@Dimension("type") @PathParam("type") String type,
@Dimension("entity") @PathParam("entity") String entity,
@Dimension("colo") @QueryParam("colo") String colo,
- @Dimension("lifecycle") @QueryParam("lifecycle") List<LifeCycle> lifeCycles) {
- return super.getRunningInstances(type, entity, colo, lifeCycles);
+ @Dimension("lifecycle") @QueryParam("lifecycle") List<LifeCycle> lifeCycles,
+ @DefaultValue("") @QueryParam("filterBy") String filterBy,
+ @DefaultValue("") @QueryParam("orderBy") String orderBy,
+ @DefaultValue("0") @QueryParam("offset") Integer offset,
+ @DefaultValue("-1") @QueryParam("numResults") Integer resultsPerPage) {
+ return super.getRunningInstances(type, entity, colo, lifeCycles, filterBy, orderBy, offset, resultsPerPage);
+ }
+
+ /*
+ getStatus(...) method actually gets all instances, filtered by a specific status. This is
+ a better named API which achieves the same result
+ */
+ @GET
+ @Path("list/{type}/{entity}")
+ @Produces(MediaType.APPLICATION_JSON)
+ @Monitored(event = "instance-list")
+ @Override
+ public InstancesResult getInstances(
+ @Dimension("type") @PathParam("type") String type,
+ @Dimension("entity") @PathParam("entity") String entity,
+ @Dimension("start-time") @QueryParam("start") String startStr,
+ @Dimension("end-time") @QueryParam("end") String endStr,
+ @Dimension("colo") @QueryParam("colo") String colo,
+ @Dimension("lifecycle") @QueryParam("lifecycle") List<LifeCycle> lifeCycles,
+ @DefaultValue("") @QueryParam("filterBy") String filterBy,
+ @DefaultValue("") @QueryParam("orderBy") String orderBy,
+ @DefaultValue("0") @QueryParam("offset") Integer offset,
+ @DefaultValue("-1") @QueryParam("numResults") Integer resultsPerPage) {
+ return super.getInstances(type, entity, startStr, endStr, colo, lifeCycles,
+ filterBy, orderBy, offset, resultsPerPage);
}
@GET
@@ -52,25 +81,32 @@ public class InstanceManager extends AbstractInstanceManager {
@Produces(MediaType.APPLICATION_JSON)
@Monitored(event = "instance-status")
@Override
- public InstancesResult getStatus(@Dimension("type") @PathParam("type") String type,
- @Dimension("entity") @PathParam("entity") String entity,
- @Dimension("start-time") @QueryParam("start") String startStr,
- @Dimension("end-time") @QueryParam("end") String endStr,
- @Dimension("colo") @QueryParam("colo") String colo,
- @Dimension("lifecycle") @QueryParam("lifecycle") List<LifeCycle> lifeCycles) {
- return super.getStatus(type, entity, startStr, endStr, colo, lifeCycles);
+ public InstancesResult getStatus(
+ @Dimension("type") @PathParam("type") String type,
+ @Dimension("entity") @PathParam("entity") String entity,
+ @Dimension("start-time") @QueryParam("start") String startStr,
+ @Dimension("end-time") @QueryParam("end") String endStr,
+ @Dimension("colo") @QueryParam("colo") String colo,
+ @Dimension("lifecycle") @QueryParam("lifecycle") List<LifeCycle> lifeCycles,
+ @DefaultValue("") @QueryParam("filterBy") String filterBy,
+ @DefaultValue("") @QueryParam("orderBy") String orderBy,
+ @DefaultValue("0") @QueryParam("offset") Integer offset,
+ @DefaultValue("-1") @QueryParam("numResults") Integer resultsPerPage) {
+ return super.getStatus(type, entity, startStr, endStr, colo, lifeCycles,
+ filterBy, orderBy, offset, resultsPerPage);
}
@GET
@Path("summary/{type}/{entity}")
@Produces(MediaType.APPLICATION_JSON)
@Monitored(event = "instance-summary")
- public InstancesSummaryResult getSummary(@Dimension("type") @PathParam("type") String type,
- @Dimension("entity") @PathParam("entity") String entity,
- @Dimension("start-time") @QueryParam("start") String startStr,
- @Dimension("end-time") @QueryParam("end") String endStr,
- @Dimension("colo") @QueryParam("colo") String colo,
- @Dimension("lifecycle") @QueryParam("lifecycle") List<LifeCycle> lifeCycles) {
+ public InstancesSummaryResult getSummary(
+ @Dimension("type") @PathParam("type") String type,
+ @Dimension("entity") @PathParam("entity") String entity,
+ @Dimension("start-time") @QueryParam("start") String startStr,
+ @Dimension("end-time") @QueryParam("end") String endStr,
+ @Dimension("colo") @QueryParam("colo") String colo,
+ @Dimension("lifecycle") @QueryParam("lifecycle") List<LifeCycle> lifeCycles) {
return super.getSummary(type, entity, startStr, endStr, colo, lifeCycles);
}
@@ -86,8 +122,13 @@ public class InstanceManager extends AbstractInstanceManager {
@Dimension("end-time") @QueryParam("end") String endStr,
@Dimension("colo") @QueryParam("colo") String colo,
@Dimension("run-id") @QueryParam("runid") String runId,
- @Dimension("lifecycle") @QueryParam("lifecycle") List<LifeCycle> lifeCycles) {
- return super.getLogs(type, entity, startStr, endStr, colo, runId, lifeCycles);
+ @Dimension("lifecycle") @QueryParam("lifecycle") List<LifeCycle> lifeCycles,
+ @DefaultValue("") @QueryParam("filterBy") String filterBy,
+ @DefaultValue("") @QueryParam("orderBy") String orderBy,
+ @DefaultValue("0") @QueryParam("offset") Integer offset,
+ @DefaultValue("-1") @QueryParam("numResults") Integer resultsPerPage) {
+ return super.getLogs(type, entity, startStr, endStr, colo, runId, lifeCycles,
+ filterBy, orderBy, offset, resultsPerPage);
}
@GET
@@ -110,13 +151,14 @@ public class InstanceManager extends AbstractInstanceManager {
@Produces(MediaType.APPLICATION_JSON)
@Monitored(event = "kill-instance")
@Override
- public InstancesResult killInstance(@Context HttpServletRequest request,
- @Dimension("type") @PathParam("type") String type,
- @Dimension("entity") @PathParam("entity") String entity,
- @Dimension("start-time") @QueryParam("start") String startStr,
- @Dimension("end-time") @QueryParam("end") String endStr,
- @Dimension("colo") @QueryParam("colo") String colo,
- @Dimension("lifecycle") @QueryParam("lifecycle") List<LifeCycle> lifeCycles) {
+ public InstancesResult killInstance(
+ @Context HttpServletRequest request,
+ @Dimension("type") @PathParam("type") String type,
+ @Dimension("entity") @PathParam("entity") String entity,
+ @Dimension("start-time") @QueryParam("start") String startStr,
+ @Dimension("end-time") @QueryParam("end") String endStr,
+ @Dimension("colo") @QueryParam("colo") String colo,
+ @Dimension("lifecycle") @QueryParam("lifecycle") List<LifeCycle> lifeCycles) {
return super.killInstance(request, type, entity, startStr, endStr, colo, lifeCycles);
}
@@ -141,13 +183,14 @@ public class InstanceManager extends AbstractInstanceManager {
@Produces(MediaType.APPLICATION_JSON)
@Monitored(event = "resume-instance")
@Override
- public InstancesResult resumeInstance(@Context HttpServletRequest request,
- @Dimension("type") @PathParam("type") String type,
- @Dimension("entity") @PathParam("entity") String entity,
- @Dimension("start-time") @QueryParam("start") String startStr,
- @Dimension("end-time") @QueryParam("end") String endStr,
- @Dimension("colo") @QueryParam("colo") String colo,
- @Dimension("lifecycle") @QueryParam("lifecycle") List<LifeCycle> lifeCycles) {
+ public InstancesResult resumeInstance(
+ @Context HttpServletRequest request,
+ @Dimension("type") @PathParam("type") String type,
+ @Dimension("entity") @PathParam("entity") String entity,
+ @Dimension("start-time") @QueryParam("start") String startStr,
+ @Dimension("end-time") @QueryParam("end") String endStr,
+ @Dimension("colo") @QueryParam("colo") String colo,
+ @Dimension("lifecycle") @QueryParam("lifecycle") List<LifeCycle> lifeCycles) {
return super.resumeInstance(request, type, entity, startStr, endStr, colo, lifeCycles);
}
@@ -156,13 +199,16 @@ public class InstanceManager extends AbstractInstanceManager {
@Produces(MediaType.APPLICATION_JSON)
@Monitored(event = "re-run-instance")
@Override
- public InstancesResult reRunInstance(@Dimension("type") @PathParam("type") String type,
- @Dimension("entity") @PathParam("entity") String entity,
- @Dimension("start-time") @QueryParam("start") String startStr,
- @Dimension("end-time") @QueryParam("end") String endStr,
- @Context HttpServletRequest request,
- @Dimension("colo") @QueryParam("colo") String colo,
- @Dimension("lifecycle") @QueryParam("lifecycle") List<LifeCycle> lifeCycles) {
+ public InstancesResult reRunInstance(
+ @Dimension("type") @PathParam("type") String type,
+ @Dimension("entity") @PathParam("entity") String entity,
+ @Dimension("start-time") @QueryParam("start") String startStr,
+ @Dimension("end-time") @QueryParam("end") String endStr,
+ @Context HttpServletRequest request,
+ @Dimension("colo") @QueryParam("colo") String colo,
+ @Dimension("lifecycle") @QueryParam("lifecycle") List<LifeCycle> lifeCycles) {
return super.reRunInstance(type, entity, startStr, endStr, request, colo, lifeCycles);
}
+ //RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/62b2af02/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
index 3c9078d..f993f81 100644
--- a/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
+++ b/webapp/src/main/java/org/apache/falcon/resource/SchedulableEntityManager.java
@@ -59,8 +59,13 @@ public class SchedulableEntityManager extends AbstractSchedulableEntityManager {
@Monitored(event = "dependencies")
@Override
public EntityList getEntityList(@Dimension("type") @PathParam("type") String type,
- @DefaultValue("") @QueryParam("fields") String fields) {
- return super.getEntityList(type, fields);
+ @DefaultValue("") @QueryParam("fields") String fields,
+ @DefaultValue("") @QueryParam("filterBy") String filterBy,
+ @DefaultValue("") @QueryParam("tags") String tags,
+ @DefaultValue("") @QueryParam("orderBy") String orderBy,
+ @DefaultValue("0") @QueryParam("offset") Integer offset,
+ @DefaultValue("-1") @QueryParam("numResults") Integer resultsPerPage) {
+ return super.getEntityList(type, fields, filterBy, tags, orderBy, offset, resultsPerPage);
}
@GET
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/62b2af02/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
index ea40411..b65aa46 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLIIT.java
@@ -388,6 +388,54 @@ public class FalconCLIIT {
+ overlay.get("processName")
+ " -start " + START_INSTANCE));
+ // test filterBy, orderBy, offset, numResults
+ String startTimeString = SchemaHelper.getDateFormat().format(new Date());
+ Assert.assertEquals(0,
+ executeWithURL("instance -running -type feed -lifecycle eviction -name "
+ + overlay.get("outputFeedName")
+ + " -start " + startTimeString
+ + " -orderBy startTime -offset 0 -numResults 1"));
+ Assert.assertEquals(-1,
+ executeWithURL("instance -running -type feed -lifecycle eviction -name "
+ + overlay.get("outputFeedName")
+ + " -start " + SchemaHelper.getDateFormat().format(new Date())
+ + " -orderBy INVALID -offset 0 -numResults 1"));
+ Assert.assertEquals(-1,
+ executeWithURL("instance -running -type feed -lifecycle eviction -name "
+ + overlay.get("outputFeedName")
+ + " -start " + SchemaHelper.getDateFormat().format(new Date())
+ + " -filterBy INVALID:FILTER -offset 0 -numResults 1"));
+ Assert.assertEquals(0,
+ executeWithURL("instance -status -type process -name "
+ + overlay.get("processName")
+ + " -start "+ START_INSTANCE
+ + " -filterBy STATUS:SUCCEEDED,STARTEDAFTER:"+START_INSTANCE
+ + " -orderBy startTime -offset 0 -numResults 1"));
+ Assert.assertEquals(0,
+ executeWithURL("instance -list -type feed -lifecycle eviction -name "
+ + overlay.get("outputFeedName")
+ + " -start "+ SchemaHelper.getDateFormat().format(new Date())
+ +" -filterBy STATUS:SUCCEEDED -orderBy startTime -offset 0 -numResults 1"));
+ Assert.assertEquals(-1,
+ executeWithURL("instance -status -type feed -lifecycle eviction -name "
+ + overlay.get("outputFeedName")
+ + " -start "+ SchemaHelper.getDateFormat().format(new Date())
+ +" -filterBy INVALID:FILTER -orderBy startTime -offset 0 -numResults 1"));
+ Assert.assertEquals(-1,
+ executeWithURL("instance -list -type feed -lifecycle eviction -name "
+ + overlay.get("outputFeedName")
+ + " -start "+ SchemaHelper.getDateFormat().format(new Date())
+ +" -filterBy STATUS:SUCCEEDED -orderBy INVALID -offset 0 -numResults 1"));
+ Assert.assertEquals(0,
+ executeWithURL("instance -status -type feed -lifecycle eviction -name "
+ + overlay.get("outputFeedName")
+ + " -start "+ SchemaHelper.getDateFormat().format(new Date())
+ +" -filterBy STATUS:SUCCEEDED -orderBy startTime -offset 1 -numResults 1"));
+ Assert.assertEquals(0,
+ executeWithURL("instance -list -type feed -lifecycle eviction -name "
+ + overlay.get("outputFeedName")
+ + " -start "+ SchemaHelper.getDateFormat().format(new Date())
+ +" -filterBy STATUS:SUCCEEDED -offset 0 -numResults 1"));
}
public void testInstanceRunningAndSummaryCommands() throws Exception {
@@ -430,7 +478,6 @@ public class FalconCLIIT {
+ " -start " + START_INSTANCE));
}
-
public void testInstanceSuspendAndResume() throws Exception {
TestContext context = new TestContext();
Map<String, String> overlay = context.getUniqueOverlay();
@@ -506,6 +553,86 @@ public class FalconCLIIT {
+ " -file "+ createTempJobPropertiesFile()));
}
+ @Test
+ public void testEntityPaginationFilterByCommands() throws Exception {
+
+ String filePath;
+ TestContext context = new TestContext();
+ Map<String, String> overlay = context.getUniqueOverlay();
+
+ filePath = TestContext.overlayParametersOverTemplate(context.getClusterFileTemplate(), overlay);
+ Assert.assertEquals(-1,
+ executeWithURL("entity -submitAndSchedule -type cluster -file " + filePath));
+ context.setCluster(overlay.get("cluster"));
+
+ // this is necessary for lineage
+ Assert.assertEquals(0, executeWithURL("entity -submit -type cluster -file " + filePath));
+
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+ Assert.assertEquals(0,
+ executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
+
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+ Assert.assertEquals(0,
+ executeWithURL("entity -submitAndSchedule -type feed -file " + filePath));
+
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay);
+ Assert.assertEquals(0,
+ executeWithURL("entity -submit -type feed -file " + filePath));
+
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay);
+ Assert.assertEquals(0,
+ executeWithURL("entity -submit -type feed -file " + filePath));
+
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+ Assert.assertEquals(0,
+ executeWithURL("entity -validate -type process -file " + filePath));
+
+ filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay);
+ Assert.assertEquals(0,
+ executeWithURL("entity -submitAndSchedule -type process -file "
+ + filePath));
+
+ OozieTestUtils.waitForProcessWFtoStart(context);
+
+ // test entity List cli
+ Assert.assertEquals(0,
+ executeWithURL("entity -list -type cluster"
+ + " -offset 0 -numResults 1"));
+ Assert.assertEquals(0,
+ executeWithURL("entity -list -type process -fields status "
+ + " -filterBy STATUS:SUBMITTED,TYPE:process -orderBy name -offset 1 -numResults 1"));
+ Assert.assertEquals(0,
+ executeWithURL("entity -list -type process -fields status,pipelines "
+ + " -filterBy STATUS:SUBMITTED,type:process -orderBy name -offset 1 -numResults 1"));
+ Assert.assertEquals(0,
+ executeWithURL("entity -list -type process -fields status,pipelines "
+ + " -filterBy STATUS:SUBMITTED,pipelines:testPipeline "
+ + " -orderBy name -offset 1 -numResults 1"));
+ Assert.assertEquals(0,
+ executeWithURL("entity -list -type process -fields status,tags "
+ + " -tags owner=producer@xyz.com,department=forecasting "
+ + " -filterBy STATUS:SUBMITTED,type:process -orderBy name -offset 1 -numResults 1"));
+ Assert.assertEquals(-1,
+ executeWithURL("entity -list -type process -fields status "
+ + " -filterBy STATUS:SUCCEEDED,TYPE:process -orderBy INVALID -offset 0 -numResults 1"));
+ Assert.assertEquals(-1,
+ executeWithURL("entity -list -type process -fields INVALID "
+ + " -filterBy STATUS:SUCCEEDED,TYPE:process -orderBy name -offset 1 -numResults 1"));
+ Assert.assertEquals(-1,
+ executeWithURL("entity -list -type process -fields status "
+ + " -filterBy INVALID:FILTER,TYPE:process -orderBy name -offset 1 -numResults 1"));
+ Assert.assertEquals(0,
+ executeWithURL("entity -definition -type cluster -name " + overlay.get("cluster")));
+
+ Assert.assertEquals(0,
+ executeWithURL("instance -status -type feed -name "
+ + overlay.get("outputFeedName") + " -start " + START_INSTANCE));
+ Assert.assertEquals(0,
+ executeWithURL("instance -running -type process -name " + overlay.get("processName")));
+ }
+
+
public void testContinue() throws Exception {
TestContext context = new TestContext();
Map<String, String> overlay = context.getUniqueOverlay();
@@ -618,6 +745,8 @@ public class FalconCLIIT {
executeWithURL("entity -schedule -type feed -name "
+ overlay.get("outputFeedName")));
+ Thread.sleep(500);
+
Assert.assertEquals(0,
executeWithURL("instance -logs -type process -name "
+ overlay.get("processName")
@@ -628,9 +757,31 @@ public class FalconCLIIT {
+ overlay.get("outputFeedName")
+ " -start "+ SchemaHelper.getDateFormat().format(new Date())));
+ // test filterBy, orderBy, offset, numResults
+ Assert.assertEquals(0,
+ executeWithURL("instance -logs -type process -name "
+ + overlay.get("processName")
+ + " -start " + START_INSTANCE + " -end " + START_INSTANCE
+ + " -filterBy STATUS:SUCCEEDED -orderBy startTime -offset 0 -numResults 1"));
+ Assert.assertEquals(0,
+ executeWithURL("instance -logs -type process -name "
+ + overlay.get("processName")
+ + " -start " + START_INSTANCE + " -end " + START_INSTANCE
+ + " -filterBy STATUS:SUCCEEDED,STARTEDAFTER:"+START_INSTANCE+" -offset 1 -numResults 1"));
+ Assert.assertEquals(-1,
+ executeWithURL("instance -logs -type process -name "
+ + overlay.get("processName")
+ + " -start " + START_INSTANCE + " -end " + START_INSTANCE
+ + " -filterBy INVALID:FILTER -orderBy startTime -offset 0 -numResults 1"));
+ Assert.assertEquals(-1,
+ executeWithURL("instance -logs -type process -name "
+ + overlay.get("processName")
+ + " -start " + START_INSTANCE + " -end " + START_INSTANCE
+ + " -filterBy STATUS:SUCCEEDED -orderBy wrongOrder -offset 0 -numResults 1"));
}
private int executeWithURL(String command) throws Exception {
+ //System.out.println("COMMAND IS "+command + " -url " + TestContext.BASE_URL);
return new FalconCLI()
.run((command + " -url " + TestContext.BASE_URL).split("\\s+"));
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/62b2af02/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java
index dd9c34a..cb0dd2d 100644
--- a/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java
+++ b/webapp/src/test/java/org/apache/falcon/cli/FalconCLISmokeIT.java
@@ -87,13 +87,22 @@ public class FalconCLISmokeIT {
OozieTestUtils.waitForProcessWFtoStart(context);
+ // test entity List cli
+ Assert.assertEquals(0,
+ executeWithURL("entity -list -type cluster"
+ + " -offset 0 -numResults 1"));
+ Assert.assertEquals(0,
+ executeWithURL("entity -list -type process -fields status "
+ + " -filterBy STATUS:SUBMITTED,TYPE:process -orderBy name -offset 1 -numResults 1"));
+ Assert.assertEquals(-1,
+ executeWithURL("entity -list -type process -fields status "
+ + " -filterBy STATUS:SUCCEEDED,TYPE:process -orderBy INVALID -offset 0 -numResults 1"));
Assert.assertEquals(0,
executeWithURL("entity -definition -type cluster -name " + overlay.get("cluster")));
Assert.assertEquals(0,
executeWithURL("instance -status -type feed -name "
+ overlay.get("outputFeedName") + " -start " + START_INSTANCE));
-
Assert.assertEquals(0,
executeWithURL("instance -running -type process -name " + overlay.get("processName")));
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/62b2af02/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
index 8a50005..0a41fe0 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/EntityManagerJerseyIT.java
@@ -739,6 +739,39 @@ public class EntityManagerJerseyIT {
.get(ClientResponse.class);
Assert.assertEquals(response.getStatus(), 200);
+ EntityList result = response.getEntity(EntityList.class);
+ Assert.assertNotNull(result);
+ for (EntityList.EntityElement entityElement : result.getElements()) {
+ Assert.assertNull(entityElement.status); // status is null
+ }
+
+ response = context.service
+ .path("api/entities/list/process/")
+ .queryParam("filterBy", "TYPE:PROCESS,STATUS:RUNNING")
+ .queryParam("tags", "owner=producer@xyz.com, department=forecasting")
+ .queryParam("orderBy", "name").queryParam("offset", "2")
+ .queryParam("numResults", "2").queryParam("fields", "status,tags")
+ .header("Cookie", context.getAuthenticationToken())
+ .type(MediaType.TEXT_XML)
+ .accept(MediaType.TEXT_XML)
+ .get(ClientResponse.class);
+ Assert.assertEquals(response.getStatus(), 200);
+ result = response.getEntity(EntityList.class);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(result.getElements().length, 2);
+
+ response = context.service
+ .path("api/entities/list/process/")
+ .queryParam("orderBy", "name").queryParam("offset", "50").queryParam("numResults", "2")
+ .header("Cookie", context.getAuthenticationToken())
+ .type(MediaType.TEXT_XML)
+ .accept(MediaType.TEXT_XML)
+ .get(ClientResponse.class);
+ Assert.assertEquals(response.getStatus(), 200);
+ result = response.getEntity(EntityList.class);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(result.getElements(), null);
+
Map<String, String> overlay = context.getUniqueOverlay();
overlay.put("cluster", "WTF-" + overlay.get("cluster"));
response = context.submitToFalcon(TestContext.CLUSTER_TEMPLATE, overlay, EntityType.CLUSTER);
@@ -751,7 +784,7 @@ public class EntityManagerJerseyIT {
.accept(MediaType.TEXT_XML)
.get(ClientResponse.class);
Assert.assertEquals(response.getStatus(), 200);
- EntityList result = response.getEntity(EntityList.class);
+ result = response.getEntity(EntityList.class);
Assert.assertNotNull(result);
for (EntityList.EntityElement entityElement : result.getElements()) {
Assert.assertNull(entityElement.status); // status is null
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/62b2af02/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java b/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java
index 03a19cd..863b109 100644
--- a/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java
+++ b/webapp/src/test/java/org/apache/falcon/resource/ProcessInstanceManagerIT.java
@@ -26,6 +26,7 @@ import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.resource.InstancesResult.Instance;
import org.apache.falcon.resource.InstancesResult.WorkflowStatus;
+import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.util.OozieTestUtils;
import org.apache.falcon.workflow.engine.OozieClientFactory;
import org.apache.oozie.client.ProxyOozieClient;
@@ -43,10 +44,18 @@ public class ProcessInstanceManagerIT {
private static final String START_INSTANCE = "2012-04-20T00:00Z";
protected void schedule(TestContext context) throws Exception {
- context.scheduleProcess();
+ CurrentUser.authenticate(System.getProperty("user.name"));
+ schedule(context, 1);
+ }
+
+ protected void schedule(TestContext context, int count) throws Exception {
+ for (int i=0; i<count; i++) {
+ context.scheduleProcess();
+ }
OozieTestUtils.waitForProcessWFtoStart(context);
}
+ //@Test
public void testGetRunningInstances() throws Exception {
TestContext context = new TestContext();
schedule(context);
@@ -60,6 +69,30 @@ public class ProcessInstanceManagerIT {
assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.RUNNING);
}
+ //@Test
+ public void testGetRunningInstancesPagination() throws Exception {
+ TestContext context = new TestContext();
+ schedule(context, 4);
+ InstancesResult response = context.service.path("api/instance/running/process/" + context.processName)
+ .header("Cookie", context.getAuthenticationToken())
+ .accept(MediaType.APPLICATION_JSON)
+ .get(InstancesResult.class);
+ Assert.assertEquals(APIResult.Status.SUCCEEDED, response.getStatus());
+ Assert.assertNotNull(response.getInstances());
+ assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.RUNNING);
+
+ response = context.service.path("api/instance/running/process/" + context.processName)
+ .queryParam("orderBy", "startTime").queryParam("offset", "0")
+ .queryParam("numResults", "1")
+ .header("Cookie", context.getAuthenticationToken())
+ .accept(MediaType.APPLICATION_JSON)
+ .get(InstancesResult.class);
+ Assert.assertEquals(APIResult.Status.SUCCEEDED, response.getStatus());
+ Assert.assertNotNull(response.getInstances());
+ Assert.assertEquals(1, response.getInstances().length);
+ assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.RUNNING);
+ }
+
private void assertInstance(Instance processInstance, String instance, WorkflowStatus status) {
Assert.assertNotNull(processInstance);
Assert.assertNotNull(processInstance.getInstance());
@@ -67,6 +100,7 @@ public class ProcessInstanceManagerIT {
Assert.assertEquals(processInstance.getStatus(), status);
}
+ //@Test
public void testGetInstanceStatus() throws Exception {
TestContext context = new TestContext();
schedule(context);
@@ -81,6 +115,24 @@ public class ProcessInstanceManagerIT {
assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.RUNNING);
}
+ //@Test
+ public void testGetInstanceStatusPagination() throws Exception {
+ TestContext context = new TestContext();
+ schedule(context, 4);
+
+ InstancesResult response = context.service.path("api/instance/status/process/" + context.processName)
+ .queryParam("orderBy", "startTime").queryParam("offset", "0")
+ .queryParam("numResults", "1").queryParam("filterBy", "STATUS:RUNNING")
+ .queryParam("start", START_INSTANCE)
+ .header("Cookie", context.getAuthenticationToken())
+ .accept(MediaType.APPLICATION_JSON)
+ .get(InstancesResult.class);
+ Assert.assertEquals(APIResult.Status.SUCCEEDED, response.getStatus());
+ Assert.assertNotNull(response.getInstances());
+ Assert.assertEquals(1, response.getInstances().length);
+ assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.RUNNING);
+ }
+
public void testReRunInstances() throws Exception {
testKillInstances();
TestContext context = new TestContext();
@@ -111,6 +163,28 @@ public class ProcessInstanceManagerIT {
Assert.assertEquals(1, response.getInstances().length);
assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.KILLED);
+ response = context.service.path("api/instance/status/process/" + context.processName)
+ .queryParam("orderBy", "startTime").queryParam("filterBy", "STATUS:KILLED")
+ .queryParam("start", START_INSTANCE)
+ .header("Cookie", context.getAuthenticationToken())
+ .accept(MediaType.APPLICATION_JSON)
+ .get(InstancesResult.class);
+ Assert.assertEquals(APIResult.Status.SUCCEEDED, response.getStatus());
+ Assert.assertNotNull(response.getInstances());
+ Assert.assertEquals(1, response.getInstances().length);
+ assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.KILLED);
+
+ response = context.service.path("api/instance/status/process/" + context.processName)
+ .queryParam("orderBy", "startTime").queryParam("filterBy", "STATUS:KILLED")
+ .queryParam("start", START_INSTANCE)
+ .header("Cookie", context.getAuthenticationToken())
+ .accept(MediaType.APPLICATION_JSON)
+ .get(InstancesResult.class);
+ Assert.assertEquals(APIResult.Status.SUCCEEDED, response.getStatus());
+ Assert.assertNotNull(response.getInstances());
+ Assert.assertEquals(1, response.getInstances().length);
+ assertInstance(response.getInstances()[0], START_INSTANCE, WorkflowStatus.KILLED);
+
waitForWorkflow(START_INSTANCE, WorkflowJob.Status.KILLED);
}
@@ -159,7 +233,6 @@ public class ProcessInstanceManagerIT {
if (jobInfo.getStatus() == status) {
break;
}
- System.out.println("Waiting for workflow job " + jobId + " status " + status);
Thread.sleep((i + 1) * 1000);
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/62b2af02/webapp/src/test/resources/process-template.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/process-template.xml b/webapp/src/test/resources/process-template.xml
index 8015da2..06215a2 100644
--- a/webapp/src/test/resources/process-template.xml
+++ b/webapp/src/test/resources/process-template.xml
@@ -18,6 +18,8 @@
-->
<process name="##processName##" xmlns="uri:falcon:process:0.1">
+ <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, department=forecasting</tags>
+ <pipelines>testPipeline,dataReplicationPipeline</pipelines>
<clusters>
<cluster name="##cluster##">
<validity end="2012-04-21T00:00Z" start="2012-04-20T00:00Z"/>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/62b2af02/webapp/src/test/resources/process-version-0.xml
----------------------------------------------------------------------
diff --git a/webapp/src/test/resources/process-version-0.xml b/webapp/src/test/resources/process-version-0.xml
index 3ac7ac8..b193ddc 100644
--- a/webapp/src/test/resources/process-version-0.xml
+++ b/webapp/src/test/resources/process-version-0.xml
@@ -18,6 +18,8 @@
-->
<process name="sample" xmlns="uri:falcon:process:0.1">
<!-- where -->
+ <tags>consumer=consumer@xyz.com, owner=producer@xyz.com, department=forecasting</tags>
+ <pipelines>testPipeline,dataReplicationPipeline</pipelines>
<clusters>
<cluster name="testCluster">
<validity start="2011-11-01T00:00Z" end="2011-12-31T23:59Z"/>
[2/2] git commit: FALCON-470 Add support for pagination, filter-by,
etc. to Entity and Instance List API. Contributed by Balu Vellanki
Posted by ve...@apache.org.
FALCON-470 Add support for pagination, filter-by, etc. to Entity and Instance List API. Contributed by Balu Vellanki
Project: http://git-wip-us.apache.org/repos/asf/incubator-falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-falcon/commit/62b2af02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-falcon/tree/62b2af02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-falcon/diff/62b2af02
Branch: refs/heads/master
Commit: 62b2af0258a5c3a72cf6bd26ad6fa2d9de53b05c
Parents: 8542798
Author: Venkatesh Seetharam <ve...@apache.org>
Authored: Tue Aug 19 16:37:02 2014 -0700
Committer: Venkatesh Seetharam <ve...@apache.org>
Committed: Tue Aug 19 16:37:02 2014 -0700
----------------------------------------------------------------------
CHANGES.txt | 3 +
.../java/org/apache/falcon/cli/FalconCLI.java | 170 ++++++++--
.../org/apache/falcon/client/FalconClient.java | 94 ++++--
.../org/apache/falcon/resource/EntityList.java | 37 ++-
.../apache/falcon/resource/InstancesResult.java | 7 +
docs/src/site/twiki/restapi/EntityList.twiki | 37 ++-
docs/src/site/twiki/restapi/InstanceList.twiki | 86 +++++
docs/src/site/twiki/restapi/InstanceLogs.twiki | 60 +++-
.../site/twiki/restapi/InstanceRunning.twiki | 46 +++
.../src/site/twiki/restapi/InstanceStatus.twiki | 45 ++-
.../site/twiki/restapi/InstanceSummary.twiki | 1 +
docs/src/site/twiki/restapi/ResourceList.twiki | 20 +-
.../falcon/resource/AbstractEntityManager.java | 329 ++++++++++++++++---
.../resource/AbstractInstanceManager.java | 151 ++++++++-
.../resource/proxy/InstanceManagerProxy.java | 59 +++-
.../proxy/SchedulableEntityManagerProxy.java | 9 +-
.../falcon/resource/EntityManagerTest.java | 108 +++++-
.../apache/falcon/resource/InstanceManager.java | 122 ++++---
.../resource/SchedulableEntityManager.java | 9 +-
.../java/org/apache/falcon/cli/FalconCLIIT.java | 153 ++++++++-
.../org/apache/falcon/cli/FalconCLISmokeIT.java | 11 +-
.../falcon/resource/EntityManagerJerseyIT.java | 35 +-
.../resource/ProcessInstanceManagerIT.java | 77 ++++-
webapp/src/test/resources/process-template.xml | 2 +
webapp/src/test/resources/process-version-0.xml | 2 +
25 files changed, 1495 insertions(+), 178 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/62b2af02/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ae8e03d..55856ea 100755
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -27,6 +27,9 @@ Trunk (Unreleased)
FALCON-263 API to get workflow parameters. (pavan kumar kolamuri via Shwetha GS)
IMPROVEMENTS
+ FALCON-470 Add support for pagination, filter by, etc. to Entity and
+ Instance List API (Balu Vellanki via Venkatesh Seetharam)
+
FALCON-581 merlin: Refactor code for cross product and make it a method
(Raghav Kumar Gautam via Arpit Gupta)
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/62b2af02/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
index 0e60129..b0c133a 100644
--- a/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
+++ b/client/src/main/java/org/apache/falcon/cli/FalconCLI.java
@@ -25,11 +25,13 @@ import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.falcon.LifeCycle;
import org.apache.falcon.client.FalconCLIException;
import org.apache.falcon.client.FalconClient;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.resource.EntityList;
+import org.apache.falcon.resource.InstancesResult;
import java.io.IOException;
import java.io.InputStream;
@@ -40,6 +42,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.List;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -78,6 +81,13 @@ public class FalconCLI {
public static final String DEPENDENCY_OPT = "dependency";
public static final String LIST_OPT = "list";
+ public static final String FIELDS_OPT = "fields";
+ public static final String FILTER_BY_OPT = "filterBy";
+ public static final String TAGS_OPT = "tags";
+ public static final String ORDER_BY_OPT = "orderBy";
+ public static final String OFFSET_OPT = "offset";
+ public static final String NUM_RESULTS_OPT = "numResults";
+
public static final String INSTANCE_CMD = "instance";
public static final String START_OPT = "start";
public static final String END_OPT = "end";
@@ -148,12 +158,10 @@ public class FalconCLI {
parser.addCommand(ADMIN_CMD, "", "admin operations", createAdminOptions(), true);
parser.addCommand(HELP_CMD, "", "display usage", new Options(), false);
parser.addCommand(VERSION_CMD, "", "show client version", new Options(), false);
- parser.addCommand(ENTITY_CMD,
- "",
+ parser.addCommand(ENTITY_CMD, "",
"Entity operations like submit, suspend, resume, delete, status, definition, submitAndSchedule",
entityOptions(), false);
- parser.addCommand(INSTANCE_CMD,
- "",
+ parser.addCommand(INSTANCE_CMD, "",
"Process instances operations like running, status, kill, suspend, resume, rerun, logs",
instanceOptions(), false);
parser.addCommand(GRAPH_CMD, "", "graph operations", createGraphOptions(), true);
@@ -178,7 +186,6 @@ public class FalconCLI {
graphCommand(commandLine, client);
}
}
-
return exitValue;
} catch (ParseException ex) {
ERR.get().println("Invalid sub-command: " + ex.getMessage());
@@ -213,20 +220,30 @@ public class FalconCLI {
String start = commandLine.getOptionValue(START_OPT);
String end = commandLine.getOptionValue(END_OPT);
String filePath = commandLine.getOptionValue(FILE_PATH_OPT);
- String runid = commandLine.getOptionValue(RUNID_OPT);
+ String runId = commandLine.getOptionValue(RUNID_OPT);
String colo = commandLine.getOptionValue(COLO_OPT);
String clusters = commandLine.getOptionValue(CLUSTERS_OPT);
String sourceClusters = commandLine.getOptionValue(SOURCECLUSTER_OPT);
List<LifeCycle> lifeCycles = getLifeCycle(commandLine.getOptionValue(LIFECYCLE_OPT));
+ String filterBy = commandLine.getOptionValue(FILTER_BY_OPT);
+ String orderBy = commandLine.getOptionValue(ORDER_BY_OPT);
+ Integer offset = validateIntInput(commandLine.getOptionValue(OFFSET_OPT), 0, "offset");
+ Integer numResults = validateIntInput(commandLine.getOptionValue(NUM_RESULTS_OPT), -1, "numResults");
colo = getColo(colo);
-
+ String instanceAction = "instance";
validateInstanceCommands(optionsList, entity, type, start, colo);
+
if (optionsList.contains(RUNNING_OPT)) {
- result = client.getRunningInstances(type, entity, colo, lifeCycles);
- } else if (optionsList.contains(STATUS_OPT)) {
- result = client.getStatusOfInstances(type, entity, start, end, colo, lifeCycles);
+ validateOrderBy(orderBy, instanceAction);
+ validateFilterBy(filterBy, instanceAction);
+ result = client.getRunningInstances(type, entity, colo, lifeCycles, filterBy, orderBy, offset, numResults);
+ } else if (optionsList.contains(STATUS_OPT) || optionsList.contains(LIST_OPT)) {
+ validateOrderBy(orderBy, instanceAction);
+ validateFilterBy(filterBy, instanceAction);
+ result = client.getStatusOfInstances(type, entity, start, end, colo, lifeCycles,
+ filterBy, orderBy, offset, numResults);
} else if (optionsList.contains(SUMMARY_OPT)) {
result = client.getSummaryOfInstances(type, entity, start, end, colo, lifeCycles);
} else if (optionsList.contains(KILL_OPT)) {
@@ -241,7 +258,10 @@ public class FalconCLI {
} else if (optionsList.contains(CONTINUE_OPT)) {
result = client.rerunInstances(type, entity, start, end, colo, clusters, sourceClusters, lifeCycles);
} else if (optionsList.contains(LOG_OPT)) {
- result = client.getLogsOfInstances(type, entity, start, end, colo, runid, lifeCycles);
+ validateOrderBy(orderBy, instanceAction);
+ validateFilterBy(filterBy, instanceAction);
+ result = client.getLogsOfInstances(type, entity, start, end, colo, runId, lifeCycles,
+ filterBy, orderBy, offset, numResults);
} else if (optionsList.contains(PARARMS_OPT)) {
// start time is the nominal time of instance
result = client.getParamsOfInstance(type, entity, start, colo, clusters, sourceClusters, lifeCycles);
@@ -252,24 +272,37 @@ public class FalconCLI {
OUT.get().println(result);
}
+ private Integer validateIntInput(String optionValue, int defaultVal, String optionName) throws FalconCLIException {
+ Integer integer = defaultVal;
+ if (optionValue != null) {
+ try {
+ return Integer.parseInt(optionValue);
+ } catch (NumberFormatException e) {
+ throw new FalconCLIException("Input value provided for queryParam \""+ optionName
+ +"\" is not a valid Integer");
+ }
+ }
+ return integer;
+ }
+
private void validateInstanceCommands(Set<String> optionsList,
String entity, String type,
String start, String colo) throws FalconCLIException {
- if (entity == null || entity.equals("")) {
+ if (StringUtils.isEmpty(entity)) {
throw new FalconCLIException("Missing argument: name");
}
- if (type == null || type.equals("")) {
+ if (StringUtils.isEmpty(type)) {
throw new FalconCLIException("Missing argument: type");
}
- if (colo == null || colo.equals("")) {
+ if (StringUtils.isEmpty(colo)) {
throw new FalconCLIException("Missing argument: colo");
}
if (!optionsList.contains(RUNNING_OPT)) {
- if (start == null || start.equals("")) {
+ if (StringUtils.isEmpty(start)) {
throw new FalconCLIException("Missing argument: start");
}
}
@@ -306,7 +339,12 @@ public class FalconCLI {
String filePath = commandLine.getOptionValue(FILE_PATH_OPT);
String colo = commandLine.getOptionValue(COLO_OPT);
String time = commandLine.getOptionValue(EFFECTIVE_OPT);
-
+ String orderBy = commandLine.getOptionValue(ORDER_BY_OPT);
+ String filterBy = commandLine.getOptionValue(FILTER_BY_OPT);
+ String filterTags = commandLine.getOptionValue(TAGS_OPT);
+ String fields = commandLine.getOptionValue(FIELDS_OPT);
+ Integer offset = validateIntInput(commandLine.getOptionValue(OFFSET_OPT), 0, "offset");
+ Integer numResults =validateIntInput(commandLine.getOptionValue(NUM_RESULTS_OPT), -1, "numResults");
validateEntityType(entityType);
if (optionsList.contains(SUBMIT_OPT)) {
@@ -317,7 +355,7 @@ public class FalconCLI {
validateFilePath(filePath);
validateColo(optionsList);
validateEntityName(entityName);
- Date effectiveTime = validateTime(time);
+ Date effectiveTime = parseDateString(time);
result = client.update(entityType, entityName, filePath, effectiveTime);
} else if (optionsList.contains(SUBMIT_AND_SCHEDULE_OPT)) {
validateFilePath(filePath);
@@ -357,7 +395,11 @@ public class FalconCLI {
result = client.getDependency(entityType, entityName).toString();
} else if (optionsList.contains(LIST_OPT)) {
validateColo(optionsList);
- EntityList entityList = client.getEntityList(entityType);
+ validateEntityFields(fields);
+ validateOrderBy(orderBy, "entity");
+ validateFilterBy(filterBy, "entity");
+ EntityList entityList = client.getEntityList(entityType, fields, filterBy,
+ filterTags, orderBy, offset, numResults);
result = entityList != null ? entityList.toString() : "No entity of type (" + entityType + ") found.";
} else if (optionsList.contains(HELP_CMD)) {
OUT.get().println("Falcon Help");
@@ -378,7 +420,7 @@ public class FalconCLI {
private void validateFilePath(String filePath)
throws FalconCLIException {
- if (filePath == null || filePath.equals("")) {
+ if (StringUtils.isEmpty(filePath)) {
throw new FalconCLIException("Missing argument: file");
}
}
@@ -391,7 +433,60 @@ public class FalconCLI {
}
}
- private Date validateTime(String time) throws FalconCLIException {
+ private void validateEntityFields(String fields) throws FalconCLIException {
+ if (StringUtils.isEmpty(fields)) {
+ return;
+ }
+ String[] fieldsList = fields.split(",");
+ for (String s : fieldsList) {
+ try {
+ EntityList.EntityFieldList.valueOf(s.toUpperCase());
+ } catch (IllegalArgumentException ie) {
+ throw new FalconCLIException("Invalid fields argument : " + FIELDS_OPT);
+ }
+ }
+ }
+
+ private void validateFilterBy(String filterBy, String filterType) throws FalconCLIException {
+ if (StringUtils.isEmpty(filterBy)) {
+ return;
+ }
+ String[] filterSplits = filterBy.split(",");
+ for (String s : filterSplits) {
+ String[] tempKeyVal = s.split(":", 2);
+ try {
+ if (filterType.equals("entity")) {
+ EntityList.EntityFilterByFields.valueOf(tempKeyVal[0].toUpperCase());
+ } else if (filterType.equals("instance")) {
+ InstancesResult.InstanceFilterFields.valueOf(tempKeyVal[0].toUpperCase());
+ } else {
+ throw new IllegalArgumentException("Invalid API call");
+ }
+ } catch (IllegalArgumentException ie) {
+ throw new FalconCLIException("Invalid filterBy argument : " + FILTER_BY_OPT);
+ }
+ }
+ }
+
+ private void validateOrderBy(String orderBy, String action) throws FalconCLIException {
+ if (StringUtils.isEmpty(orderBy)) {
+ return;
+ }
+ if (action.equals("instance")) {
+ if (Arrays.asList(new String[]{"status", "cluster", "starttime", "endtime"})
+ .contains(orderBy.toLowerCase())) {
+ return;
+ }
+ } else if (action.equals("entity")) {
+ if (Arrays.asList(new String[] {"type", "name"}).contains(orderBy.toLowerCase())) {
+ return;
+ }
+ }
+ throw new FalconCLIException("Invalid orderBy argument : " + ORDER_BY_OPT);
+ }
+
+
+ private Date parseDateString(String time) throws FalconCLIException {
if (time != null && !time.isEmpty()) {
try {
return SchemaHelper.parseDateUTC(time);
@@ -404,7 +499,7 @@ public class FalconCLI {
private void validateEntityName(String entityName)
throws FalconCLIException {
- if (entityName == null || entityName.equals("")) {
+ if (StringUtils.isEmpty(entityName)) {
throw new FalconCLIException("Missing argument: name");
}
}
@@ -412,7 +507,7 @@ public class FalconCLI {
private void validateEntityType(String entityType)
throws FalconCLIException {
- if (entityType == null || entityType.equals("")) {
+ if (StringUtils.isEmpty(entityType)) {
throw new FalconCLIException("Missing argument: type");
}
}
@@ -495,6 +590,16 @@ public class FalconCLI {
"Colo name");
colo.setRequired(false);
Option effective = new Option(EFFECTIVE_OPT, true, "Effective time for update");
+ Option fields = new Option(FIELDS_OPT, true, "Entity fields to show for a request");
+ Option filterBy = new Option(FILTER_BY_OPT, true,
+ "Filter returned entities by the specified status");
+ Option filterTags = new Option(TAGS_OPT, true, "Filter returned entities by the specified tags");
+ Option orderBy = new Option(ORDER_BY_OPT, true,
+ "Order returned entities by this field");
+ Option offset = new Option(OFFSET_OPT, true,
+ "Start returning entities from this offset");
+ Option numResults = new Option(NUM_RESULTS_OPT, true,
+ "Number of results to return per request");
entityOptions.addOption(url);
entityOptions.addOptionGroup(group);
@@ -503,6 +608,12 @@ public class FalconCLI {
entityOptions.addOption(filePath);
entityOptions.addOption(colo);
entityOptions.addOption(effective);
+ entityOptions.addOption(fields);
+ entityOptions.addOption(filterBy);
+ entityOptions.addOption(filterTags);
+ entityOptions.addOption(orderBy);
+ entityOptions.addOption(offset);
+ entityOptions.addOption(numResults);
return entityOptions;
}
@@ -513,6 +624,8 @@ public class FalconCLI {
Option running = new Option(RUNNING_OPT, false,
"Gets running process instances for a given process");
+ Option list = new Option(LIST_OPT, false,
+ "Gets all instances for a given process in the range start time and optional end time");
Option status = new Option(
STATUS_OPT,
false,
@@ -560,6 +673,7 @@ public class FalconCLI {
OptionGroup group = new OptionGroup();
group.addOption(running);
+ group.addOption(list);
group.addOption(status);
group.addOption(summary);
group.addOption(kill);
@@ -603,6 +717,14 @@ public class FalconCLI {
true,
"describes life cycle of entity , for feed it can be replication/retention "
+ "and for process it can be execution");
+ Option filterBy = new Option(FILTER_BY_OPT, true,
+ "Filter returned instances by the specified fields");
+ Option orderBy = new Option(ORDER_BY_OPT, true,
+ "Order returned instances by this field");
+ Option offset = new Option(OFFSET_OPT, true,
+ "Start returning instances from this offset");
+ Option numResults = new Option(NUM_RESULTS_OPT, true,
+ "Number of results to return per request");
instanceOptions.addOption(url);
instanceOptions.addOptionGroup(group);
@@ -616,6 +738,10 @@ public class FalconCLI {
instanceOptions.addOption(sourceClusters);
instanceOptions.addOption(colo);
instanceOptions.addOption(lifecycle);
+ instanceOptions.addOption(filterBy);
+ instanceOptions.addOption(offset);
+ instanceOptions.addOption(orderBy);
+ instanceOptions.addOption(numResults);
return instanceOptions;
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/62b2af02/client/src/main/java/org/apache/falcon/client/FalconClient.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java
index ed0a0ba..6697227 100644
--- a/client/src/main/java/org/apache/falcon/client/FalconClient.java
+++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java
@@ -24,6 +24,7 @@ import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import com.sun.jersey.client.urlconnection.HTTPSProperties;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.net.util.TrustManagerUtils;
import org.apache.falcon.LifeCycle;
import org.apache.falcon.entity.v0.EntityType;
@@ -196,6 +197,7 @@ public class FalconClient {
protected static enum Instances {
RUNNING("api/instance/running/", HttpMethod.GET, MediaType.APPLICATION_JSON),
STATUS("api/instance/status/", HttpMethod.GET, MediaType.APPLICATION_JSON),
+ LIST("api/instance/list", HttpMethod.GET, MediaType.APPLICATION_JSON),
KILL("api/instance/kill/", HttpMethod.POST, MediaType.APPLICATION_JSON),
SUSPEND("api/instance/suspend/", HttpMethod.POST, MediaType.APPLICATION_JSON),
RESUME("api/instance/resume/", HttpMethod.POST, MediaType.APPLICATION_JSON),
@@ -329,23 +331,28 @@ public class FalconClient {
return sendDependencyRequest(Entities.DEPENDENCY, entityType, entityName);
}
- public EntityList getEntityList(String entityType) throws FalconCLIException {
- return sendListRequest(Entities.LIST, entityType);
+ public EntityList getEntityList(String entityType, String fields, String filterBy, String filterTags,
+ String orderBy, Integer offset, Integer numResults) throws FalconCLIException {
+ return sendListRequest(Entities.LIST, entityType, fields, filterBy,
+ filterTags, orderBy, offset, numResults);
}
- public String getRunningInstances(String type, String entity, String colo, List<LifeCycle> lifeCycles)
+ //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+ public String getRunningInstances(String type, String entity, String colo, List<LifeCycle> lifeCycles,
+ String filterBy, String orderBy, Integer offset, Integer numResults)
throws FalconCLIException {
return sendInstanceRequest(Instances.RUNNING, type, entity, null, null,
- null, null, colo, lifeCycles);
+ null, null, colo, lifeCycles, filterBy, orderBy, offset, numResults);
}
public String getStatusOfInstances(String type, String entity,
String start, String end,
- String colo, List<LifeCycle> lifeCycles) throws FalconCLIException {
+ String colo, List<LifeCycle> lifeCycles, String filterBy,
+ String orderBy, Integer offset, Integer numResults) throws FalconCLIException {
return sendInstanceRequest(Instances.STATUS, type, entity, start, end,
- null, null, colo, lifeCycles);
+ null, null, colo, lifeCycles, filterBy, orderBy, offset, numResults);
}
public String getSummaryOfInstances(String type, String entity,
@@ -355,7 +362,7 @@ public class FalconClient {
return sendInstanceRequest(Instances.SUMMARY, type, entity, start, end,
null, null, colo, lifeCycles);
}
- //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+
public String killInstances(String type, String entity, String start,
String end, String colo, String clusters,
String sourceClusters, List<LifeCycle> lifeCycles)
@@ -419,11 +426,12 @@ public class FalconClient {
public String getLogsOfInstances(String type, String entity, String start,
String end, String colo, String runId,
- List<LifeCycle> lifeCycles)
+ List<LifeCycle> lifeCycles, String filterBy,
+ String orderBy, Integer offset, Integer numResults)
throws FalconCLIException {
return sendInstanceRequest(Instances.LOG, type, entity, start, end,
- null, runId, colo, lifeCycles);
+ null, runId, colo, lifeCycles, filterBy, orderBy, offset, numResults);
}
public String getParamsOfInstance(String type, String entity,
@@ -540,20 +548,6 @@ public class FalconClient {
return parseEntityList(clientResponse);
}
- private EntityList sendListRequest(Entities entities, String entityType)
- throws FalconCLIException {
-
- ClientResponse clientResponse = service
- .path(entities.path).path(entityType)
- .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
- .accept(entities.mimeType).type(MediaType.TEXT_XML)
- .method(entities.method, ClientResponse.class);
-
- checkIfSuccessful(clientResponse);
-
- return parseEntityList(clientResponse);
- }
-
private String sendEntityRequestWithObject(Entities entities, String entityType,
Object requestObject, String colo) throws FalconCLIException {
WebResource resource = service.path(entities.path)
@@ -576,6 +570,14 @@ public class FalconClient {
String entity, String start, String end, InputStream props,
String runid, String colo,
List<LifeCycle> lifeCycles) throws FalconCLIException {
+ return sendInstanceRequest(instances, type, entity, start, end, props,
+ runid, colo, lifeCycles, "", "", 0, -1);
+ }
+
+ private String sendInstanceRequest(Instances instances, String type, String entity,
+ String start, String end, InputStream props, String runid, String colo,
+ List<LifeCycle> lifeCycles, String filterBy,
+ String orderBy, Integer offset, Integer numResults) throws FalconCLIException {
checkType(type);
WebResource resource = service.path(instances.path).path(type)
.path(entity);
@@ -591,6 +593,14 @@ public class FalconClient {
if (colo != null) {
resource = resource.queryParam("colo", colo);
}
+ if (!StringUtils.isEmpty(filterBy)) {
+ resource = resource.queryParam("filterBy", filterBy);
+ }
+ if (!StringUtils.isEmpty(orderBy)) {
+ resource = resource.queryParam("orderBy", orderBy);
+ }
+ resource = resource.queryParam("offset", offset.toString());
+ resource = resource.queryParam("numResults", numResults.toString());
if (lifeCycles != null) {
checkLifeCycleOption(lifeCycles, type);
@@ -648,10 +658,38 @@ public class FalconClient {
}
}
+ //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+ private EntityList sendListRequest(Entities entities, String entityType, String fields, String filterBy,
+ String filterTags, String orderBy, Integer offset,
+ Integer numResults) throws FalconCLIException {
+ WebResource resource = service.path(entities.path)
+ .path(entityType);
+ if (!StringUtils.isEmpty(filterBy)) {
+ resource = resource.queryParam("filterBy", filterBy);
+ }
+ if (!StringUtils.isEmpty(orderBy)) {
+ resource = resource.queryParam("orderBy", orderBy);
+ }
+ if (!StringUtils.isEmpty(fields)) {
+ resource = resource.queryParam("fields", fields);
+ }
+ if (!StringUtils.isEmpty(filterTags)) {
+ resource = resource.queryParam("tags", filterTags);
+ }
+ resource = resource.queryParam("offset", offset.toString());
+ resource = resource.queryParam("numResults", numResults.toString());
+ ClientResponse clientResponse = resource
+ .header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
+ .accept(entities.mimeType).type(MediaType.TEXT_XML)
+ .method(entities.method, ClientResponse.class);
- private String sendAdminRequest(AdminOperations job)
- throws FalconCLIException {
+ checkIfSuccessful(clientResponse);
+ return parseEntityList(clientResponse);
+ }
+ // RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
+ private String sendAdminRequest(AdminOperations job) throws FalconCLIException {
ClientResponse clientResponse = service.path(job.path)
.header("Cookie", AUTH_COOKIE_EQ + authenticationToken)
.accept(job.mimeType)
@@ -747,7 +785,7 @@ public class FalconClient {
? SchemaHelper.formatDateUTC(instance.getEndTime()) : "-";
sb.append(toAppend).append("\t");
- toAppend = (instance.getDetails() != null && !instance.getDetails().equals(""))
+ toAppend = (!StringUtils.isEmpty(instance.getDetails()))
? instance.getDetails() : "-";
sb.append(toAppend).append("\t");
@@ -884,9 +922,7 @@ public class FalconClient {
return parseStringResult(clientResponse);
}
- private void checkIfSuccessful(ClientResponse clientResponse)
- throws FalconCLIException {
-
+ private void checkIfSuccessful(ClientResponse clientResponse) throws FalconCLIException {
Response.Status.Family statusFamily = clientResponse.getClientResponseStatus().getFamily();
if (statusFamily != Response.Status.Family.SUCCESSFUL
&& statusFamily != Response.Status.Family.INFORMATIONAL) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/62b2af02/client/src/main/java/org/apache/falcon/resource/EntityList.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/EntityList.java b/client/src/main/java/org/apache/falcon/resource/EntityList.java
index 2de177d..f67b84b 100644
--- a/client/src/main/java/org/apache/falcon/resource/EntityList.java
+++ b/client/src/main/java/org/apache/falcon/resource/EntityList.java
@@ -41,6 +41,20 @@ public class EntityList {
private final EntityElement[] elements;
/**
+ * List of fields returned by RestAPI.
+ */
+ public static enum EntityFieldList {
+ TYPE, NAME, STATUS, TAGS, PIPELINES
+ }
+
+ /**
+ * Filter by these Fields is supported by RestAPI.
+ */
+ public static enum EntityFilterByFields {
+ TYPE, NAME, STATUS, PIPELINES
+ }
+
+ /**
* Element within an entity.
*/
public static class EntityElement {
@@ -53,6 +67,8 @@ public class EntityList {
public String status;
@XmlElementWrapper(name = "list")
public List<String> tag;
+ @XmlElementWrapper(name = "list")
+ public List<String> pipelines;
//RESUME CHECKSTYLE CHECK VisibilityModifierCheck
@Override
@@ -65,6 +81,10 @@ public class EntityList {
if (tag != null && !tag.isEmpty()) {
outString += " - " + tag.toString();
}
+
+ if (pipelines != null && !pipelines.isEmpty()) {
+ outString += " - " + pipelines.toString();
+ }
outString += "\n";
return outString;
}
@@ -83,16 +103,21 @@ public class EntityList {
int len = elements.length;
EntityElement[] items = new EntityElement[len];
for (int i = 0; i < len; i++) {
- Entity e = elements[i];
- EntityElement o = new EntityElement();
- o.type = e.getEntityType().name().toLowerCase();
- o.name = e.getName();
- o.status = null;
- items[i] = o;
+ items[i] = createEntityElement(elements[i]);
}
this.elements = items;
}
+ private EntityElement createEntityElement(Entity e) {
+ EntityElement element = new EntityElement();
+ element.type = e.getEntityType().name().toLowerCase();
+ element.name = e.getName();
+ element.status = null;
+ element.tag = new ArrayList<String>();
+ element.pipelines = new ArrayList<String>();
+ return element;
+ }
+
public EntityList(Entity[] dependentEntities, Entity entity) {
int len = dependentEntities.length;
EntityElement[] items = new EntityElement[len];
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/62b2af02/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
index c3c93f2..5754f97 100644
--- a/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
+++ b/client/src/main/java/org/apache/falcon/resource/InstancesResult.java
@@ -39,6 +39,13 @@ public class InstancesResult extends APIResult {
WAITING, RUNNING, SUSPENDED, KILLED, FAILED, SUCCEEDED, ERROR
}
+ /**
+ * RestAPI supports filterBy these fields of instance.
+ */
+ public static enum InstanceFilterFields {
+ STATUS, CLUSTER, SOURCECLUSTER, STARTEDAFTER
+ }
+
@XmlElement
private Instance[] instances;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/62b2af02/docs/src/site/twiki/restapi/EntityList.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/EntityList.twiki b/docs/src/site/twiki/restapi/EntityList.twiki
index b9cf349..678b907 100644
--- a/docs/src/site/twiki/restapi/EntityList.twiki
+++ b/docs/src/site/twiki/restapi/EntityList.twiki
@@ -9,8 +9,16 @@ Get list of the entities.
---++ Parameters
* :entity-type can be cluster, feed or process.
- * :fields (optional) additional fields that the client are interested in, separated by commas.
- Currently falcon only support status as a valid field.
+ * :fields <optional param> additional fields that the client are interested in, separated by commas.
+ * Currently falcon supports STATUS, TAGS, PIPELINES as valid fields.
+ * :filterBy <optional param> Filter results by a given list of field,value pair. Example: filterBy=STATUS:RUNNING,PIPELINES:clickLogs
+ * Supported filter fields are TYPE, NAME, STATUS, PIPELINES
+ * Only the entities that match both filters are returned.
+ * :tags <optional param> Filter results by a given list of tags, separated by a comma. Example: tags=consumer=consumer@xyz.com,owner=producer@xyz.com
+ * :orderBy <optional param> is the field by which results should be ordered.
+ * Supports ordering by "name","type"
+ * :offset <optional param> start showing results from the offset, used for pagination. Integer, Default is 0
+ * :numResults <optional param> number of results to show per request, used for pagination. Integer, Default is -1 (means all results)
---++ Results
List of the entities.
@@ -58,3 +66,28 @@ GET http://localhost:15000/api/entities/list/feed?fields=status
}
</verbatim>
+---+++ Rest Call
+<verbatim>
+GET http://localhost:15000/api/entities/list/feed?filterBy=STATUS:RUNNING,PIPELINES:dataReplication&fields=status,pipelines,tags&tags=consumer=consumer@xyz.com&orderBy=name&offset=2&numResults=2
+</verbatim>
+---+++ Result
+<verbatim>
+{
+ "entity": [
+ {
+ "name" : "SampleInput2",
+ "type" : "feed",
+ "status": "RUNNING",
+ "pipelines": "dataReplication",
+ "tags":consumer=consumer@xyz.com
+ },
+ {
+ "name": "SampleInput3",
+ "type": "feed",
+ "status": "RUNNING",
+ "pipelines": "dataReplication",
+ "tags":consumer=consumer@xyz.com,owner=producer@xyz.com
+ }
+ ]
+}
+</verbatim>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/62b2af02/docs/src/site/twiki/restapi/InstanceList.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceList.twiki b/docs/src/site/twiki/restapi/InstanceList.twiki
new file mode 100644
index 0000000..c2059f2
--- /dev/null
+++ b/docs/src/site/twiki/restapi/InstanceList.twiki
@@ -0,0 +1,86 @@
+---++ GET /api/instance/list/:entity-type/:entity-name
+ * <a href="#Description">Description</a>
+ * <a href="#Parameters">Parameters</a>
+ * <a href="#Results">Results</a>
+ * <a href="#Examples">Examples</a>
+
+---++ Description
+Get list of all instances of a given entity.
+
+---++ Parameters
+ * :entity-type can either be a feed or a process.
+ * :entity-name is name of the entity.
+ * start is the start time of the instance that you want to refer to
+ * end <optional param> is the end time of the instance that you want to refer to
+ * colo <optional param> colo on which the query should be run
+ * lifecycle <optional param> can be Eviction/Replication(default) for feed and Execution(default) for process.
+ * filterBy <optional param> Filter results by a given list of field,value pairs. Example: filterBy=STATUS:RUNNING,CLUSTER:primary-cluster
+ * Supported filter fields are STATUS, CLUSTER, SOURCECLUSTER,STARTEDAFTER
+ * Only the instances that match both filters are returned.
+ * orderBy <optional param> is the instance field by which results should be ordered.
+ * Supports ordering by "status","startTime","endTime","cluster"
+ * offset <optional param> start showing results from the offset, used for pagination. Integer, Default is 0
+ * numResults <optional param> number of results to show per request, used for pagination. Integer, Default is -1 (means all results)
+
+---++ Results
+List of instances of given entity.
+
+---++ Examples
+---+++ Rest Call
+<verbatim>
+GET http://localhost:15000/api/instance/list/process/SampleProcess?colo=*&start=2012-04-03T07:00Z
+</verbatim>
+---+++ Result
+<verbatim>
+{
+ "instances": [
+ {
+ "details": "",
+ "endTime": "2013-10-21T14:40:26-07:00",
+ "startTime": "2013-10-21T14:39:56-07:00",
+ "cluster": "primary-cluster",
+ "logFile": "http:\/\/localhost:11000\/oozie?job=0000070-131021115933395-oozie-rgau-W",
+ "status": "SUCCEEDED",
+ "instance": "2012-04-03T07:00Z"
+ }
+ ],
+ "requestId": "default\/e15bb378-d09f-4911-9df2-5334a45153d2\n",
+ "message": "default\/STATUS\n",
+ "status": "SUCCEEDED"
+}
+</verbatim>
+
+
+---+++ Rest Call
+<verbatim>
+GET http://localhost:15000/api/instance/list/process/SampleProcess?colo=*&start=2012-04-03T07:00Z&filterBy=STATUS:SUCCEEDED,CLUSTER:primary-cluster&orderBy=startTime&offset=2&numResults=2
+</verbatim>
+---+++ Result
+<verbatim>
+{
+ "instances": [
+ {
+ "details": "",
+ "endTime": "2013-10-21T14:40:26-07:00",
+ "startTime": "2013-10-21T14:39:56-07:00",
+ "cluster": "primary-cluster",
+ "logFile": "http:\/\/localhost:11000\/oozie?job=0000070-131021115933395-oozie-rgau-W",
+ "status": "SUCCEEDED",
+ "instance": "2012-04-03T07:00Z"
+ },
+ {
+ "details": "",
+ "endTime": "2013-10-21T14:42:26-07:00",
+ "startTime": "2013-10-21T14:41:56-07:00",
+ "cluster": "primary-cluster",
+ "logFile": "http:\/\/localhost:11000\/oozie?job=0000070-131021115933397-oozie-rgau-W",
+ "status": "SUCCEEDED",
+ "instance": "2012-04-03T08:00Z"
+ },
+ ],
+
+ "requestId": "default\/e15bb378-d09f-4911-9df2-5334a45153d2\n",
+ "message": "default\/STATUS\n",
+ "status": "SUCCEEDED"
+}
+</verbatim>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/62b2af02/docs/src/site/twiki/restapi/InstanceLogs.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceLogs.twiki b/docs/src/site/twiki/restapi/InstanceLogs.twiki
index 8522e75..56d5fe1 100644
--- a/docs/src/site/twiki/restapi/InstanceLogs.twiki
+++ b/docs/src/site/twiki/restapi/InstanceLogs.twiki
@@ -12,8 +12,16 @@ Get log of a specific instance of an entity.
* :entity-name is name of the entity.
* start is the start time of the instance that you want to refer to
* end <optional param> is the end time of the instance that you want to refer to
+ * colo <optional param> colo on which the query should be run
* lifecycle <optional param> can be Eviction/Replication(default) for feed and Execution(default) for process.
-
+ * filterBy <optional param> Filter results by a given list of field,value pairs. Example: filterBy=STATUS:SUCCEEDED,CLUSTER:primary-cluster
+ * Supported filter fields are STATUS, CLUSTER, SOURCECLUSTER,STARTEDAFTER
+ * Only the instances that match both filters are returned.
+ * orderBy <optional param> is the instance field by which results should be ordered.
+ * Supports ordering by "status","startTime","endTime","cluster"
+ * offset <optional param> start showing results from the offset, used for pagination. Integer, Default is 0
+ * numResults <optional param> number of results to show per request, used for pagination. Integer, Default is -1 (means all results)
+
---++ Results
Log of specified instance.
@@ -48,3 +56,53 @@ GET http://localhost:15000/api/instance/logs/process/SampleProcess?colo=*&start=
"status": "SUCCEEDED"
}
</verbatim>
+
+---+++ Rest Call
+<verbatim>
+GET http://localhost:15000/api/instance/logs/process/SampleProcess?colo=*&start=2012-04-03T07:00Z&filterBy=STATUS:SUCCEEDED,CLUSTER:primary-cluster&orderBy=startTime&offset=2&numResults=2
+</verbatim>
+---+++ Result
+<verbatim>
+{
+ "instances": [
+ {
+ "actions": [
+ {
+ "logFile": "http:\/\/localhost:50070\/data\/apps\/falcon\/staging\/falcon\/workflows\/process\/SampleProcess\/logs\/job-2012-04-03-07-00\/000\/pig_SUCCEEDED.log",
+ "status": "SUCCEEDED",
+ "action": "pig"
+ }
+ ],
+ "details": "",
+ "endTime": "2013-10-21T14:40:26-07:00",
+ "startTime": "2013-10-21T14:39:56-07:00",
+ "cluster": "primary-cluster",
+ "logFile": "http:\/\/localhost:50070\/data\/apps\/falcon\/staging\/falcon\/workflows\/process\/SampleProcess\/logs\/job-2012-04-03-07-00\/000\/oozie.log",
+ "status": "SUCCEEDED",
+ "instance": "2012-04-03T07:00Z"
+ },
+ {
+ "actions": [
+ {
+ "logFile": "http:\/\/localhost:50070\/data\/apps\/falcon\/staging\/falcon\/workflows\/process\/SampleProcess\/logs\/job-2012-04-03-07-00\/001\/pig_SUCCEEDED.log",
+ "status": "SUCCEEDED",
+ "action": "pig"
+ }
+ ],
+ "details": "",
+ "endTime": "2013-10-21T14:42:27-07:00",
+ "startTime": "2013-10-21T14:41:57-07:00",
+ "cluster": "primary-cluster",
+ "logFile": "http:\/\/localhost:50070\/data\/apps\/falcon\/staging\/falcon\/workflows\/process\/SampleProcess\/logs\/job-2012-04-03-07-00\/001\/oozie.log",
+ "status": "SUCCEEDED",
+ "instance": "2012-04-03T08:00Z"
+ }
+ ],
+ "requestId": "default\/3527038e-8334-4e50-8173-76c4fa430d0b\n",
+ "message": "default\/STATUS\n",
+ "status": "SUCCEEDED"
+}
+</verbatim>
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/62b2af02/docs/src/site/twiki/restapi/InstanceRunning.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceRunning.twiki b/docs/src/site/twiki/restapi/InstanceRunning.twiki
index 7fde90c..0d04008 100644
--- a/docs/src/site/twiki/restapi/InstanceRunning.twiki
+++ b/docs/src/site/twiki/restapi/InstanceRunning.twiki
@@ -10,7 +10,16 @@ Get a list of instances currently running for a given entity.
---++ Parameters
* :entity-type can either be a feed or a process.
* :entity-name is name of the entity.
+ * colo <optional param> colo on which the query should be run
* lifecycle <optional param> can be Eviction/Replication(default) for feed and Execution(default) for process.
+ * filterBy <optional param> Filter results by a given list of field,value pairs. Example: filterBy=STATUS:RUNNING,CLUSTER:primary-cluster
+ * Supported filter fields are STATUS, CLUSTER, SOURCECLUSTER,STARTEDAFTER
+ * Only the instances that match both filters are returned.
+ * orderBy <optional param> is the instance field by which results should be ordered.
+ * Supports ordering by "status","startTime","endTime","cluster"
+ * offset <optional param> start showing results from the offset, used for pagination. Integer, Default is 0
+ * numResults <optional param> number of results to show per request, used for pagination. Integer, Default is -1 (means all results)
+
---++ Results
List of instances currently running.
@@ -35,3 +44,40 @@ GET http://localhost:15000/api/instance/running/process/SampleProcess?colo=*
"status": "SUCCEEDED"
}
</verbatim>
+
+
+---+++ Rest Call
+<verbatim>
+GET http://localhost:15000/api/instance/running/process/SampleProcess?colo=*&start=2012-04-03T07:00Z&filterBy=STATUS:RUNNING,CLUSTER:primary-cluster&orderBy=startTime&offset=2&numResults=2
+</verbatim>
+---+++ Result
+<verbatim>
+{
+ "instances": [
+ {
+ "details": "",
+ "endTime": "2013-10-21T14:40:26-07:00",
+ "startTime": "2013-10-21T14:39:56-07:00",
+ "cluster": "primary-cluster",
+ "logFile": "http:\/\/localhost:11000\/oozie?job=0000070-131021115933395-oozie-rgau-W",
+ "status": "RUNNING",
+ "instance": "2012-04-03T07:00Z"
+ },
+ {
+ "details": "",
+ "endTime": "2013-10-21T14:42:27-07:00",
+ "startTime": "2013-10-21T14:41:57-07:00",
+ "cluster": "primary-cluster",
+ "logFile": "http:\/\/localhost:11000\/oozie?job=0000070-131021115933397-oozie-rgau-W",
+ "status": "RUNNING",
+ "instance": "2012-04-03T08:00Z"
+ },
+ ],
+
+ "requestId": "default\/e15bb378-d09f-4911-9df2-5334a45153d2\n",
+ "message": "default\/STATUS\n",
+ "status": "SUCCEEDED"
+}
+</verbatim>
+
+
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/62b2af02/docs/src/site/twiki/restapi/InstanceStatus.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceStatus.twiki b/docs/src/site/twiki/restapi/InstanceStatus.twiki
index 519f55e..5160252 100644
--- a/docs/src/site/twiki/restapi/InstanceStatus.twiki
+++ b/docs/src/site/twiki/restapi/InstanceStatus.twiki
@@ -12,8 +12,16 @@ Get status of a specific instance of an entity.
* :entity-name is name of the entity.
* start is the start time of the instance that you want to refer to
* end <optional param> is the end time of the instance that you want to refer to
+ * colo <optional param> colo on which the query should be run
* lifecycle <optional param> can be Eviction/Replication(default) for feed and Execution(default) for process.
-
+ * filterBy <optional param> Filter results by a given list of field,value pairs. Example: filterBy=STATUS:RUNNING,CLUSTER:primary-cluster
+ * Supported filter fields are STATUS, CLUSTER, SOURCECLUSTER,STARTEDAFTER
+ * Only the instances that match both filters are returned.
+ * orderBy <optional param> is the instance field by which results should be ordered.
+ * Supports ordering by "status","startTime","endTime","cluster"
+ * offset <optional param> start showing results from the offset, used for pagination. Integer, Default is 0
+ * numResults <optional param> number of results to show per request, used for pagination. Integer, Default is -1 (means all results)
+
---++ Results
Status of the specified instance.
@@ -41,3 +49,38 @@ GET http://localhost:15000/api/instance/status/process/SampleProcess?colo=*&star
"status": "SUCCEEDED"
}
</verbatim>
+
+
+---+++ Rest Call
+<verbatim>
+GET http://localhost:15000/api/instance/status/process/SampleProcess?colo=*&start=2012-04-03T07:00Z&filterBy=STATUS:SUCCEEDED,CLUSTER:primary-cluster&orderBy=startTime&offset=2&numResults=2
+</verbatim>
+---+++ Result
+<verbatim>
+{
+ "instances": [
+ {
+ "details": "",
+ "endTime": "2013-10-21T14:40:26-07:00",
+ "startTime": "2013-10-21T14:39:56-07:00",
+ "cluster": "primary-cluster",
+ "logFile": "http:\/\/localhost:11000\/oozie?job=0000070-131021115933395-oozie-rgau-W",
+ "status": "SUCCEEDED",
+ "instance": "2012-04-03T07:00Z"
+ },
+ {
+ "details": "",
+ "endTime": "2013-10-21T14:42:26-07:00",
+ "startTime": "2013-10-21T14:41:56-07:00",
+ "cluster": "primary-cluster",
+ "logFile": "http:\/\/localhost:11000\/oozie?job=0000070-131021115933397-oozie-rgau-W",
+ "status": "SUCCEEDED",
+ "instance": "2012-04-03T08:00Z"
+ },
+ ],
+
+ "requestId": "default\/e15bb378-d09f-4911-9df2-5334a45153d2\n",
+ "message": "default\/STATUS\n",
+ "status": "SUCCEEDED"
+}
+</verbatim>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/62b2af02/docs/src/site/twiki/restapi/InstanceSummary.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/InstanceSummary.twiki b/docs/src/site/twiki/restapi/InstanceSummary.twiki
index ee1d41f..4fa5780 100644
--- a/docs/src/site/twiki/restapi/InstanceSummary.twiki
+++ b/docs/src/site/twiki/restapi/InstanceSummary.twiki
@@ -12,6 +12,7 @@ Get summary of instance/instances of an entity.
* :entity-name is name of the entity.
* start is the start time of the instance that you want to refer to
* end <optional param> is the end time of the instance that you want to refer to
+ * colo <optional param> colo on which the query should be run
* lifecycle <optional param> can be Eviction/Replication(default) for feed and Execution(default) for process.
---++ Results
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/62b2af02/docs/src/site/twiki/restapi/ResourceList.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/restapi/ResourceList.twiki b/docs/src/site/twiki/restapi/ResourceList.twiki
index 677880c..4b377e8 100644
--- a/docs/src/site/twiki/restapi/ResourceList.twiki
+++ b/docs/src/site/twiki/restapi/ResourceList.twiki
@@ -48,19 +48,21 @@ See also: [[../Security.twiki][Security in Falcon]]
| DELETE | [[EntityDelete][api/entities/delete/:entity-type/:entity-name]] | Delete the entity |
| GET | [[EntityStatus][api/entities/status/:entity-type/:entity-name]] | Get the status of the entity |
| GET | [[EntityDefinition][api/entities/definition/:entity-type/:entity-name]] | Get the definition of the entity |
-| GET | [[EntityList][api/entities/list/:entity-type?fields=:fields]] | Get the list of entities |
+| GET | [[EntityList][api/entities/list/:entity-type?fields=:fields&filterBy=:filterBy&tags=:tags&orderBy=:orderBy&offset=:offset&numResults=:numResults]] | Get the list of entities |
| GET | [[EntityDependencies][api/entities/dependencies/:entity-type/:entity-name]] | Get the dependencies of the entity |
---++ REST Call on Feed and Process Instances
-| *Call Type* | *Resource* | *Description* |
-| GET | [[InstanceRunning][api/instance/running/:entity-type/:entity-name]] | List of running instances. |
-| GET | [[InstanceStatus][api/instance/status/:entity-type/:entity-name]] | Status of a given instance |
-| POST | [[InstanceKill][api/instance/kill/:entity-type/:entity-name]] | Kill a given instance |
-| POST | [[InstanceSuspend][api/instance/suspend/:entity-type/:entity-name]] | Suspend a running instance |
-| POST | [[InstanceResume][api/instance/resume/:entity-type/:entity-name]] | Resume a given instance |
-| POST | [[InstanceRerun][api/instance/rerun/:entity-type/:entity-name]] | Rerun a given instance |
-| GET | [[InstanceLogs][api/instance/logs/:entity-type/:entity-name]] | Get logs of a given instance |
+| *Call Type* | *Resource* | *Description* |
+| GET | [[InstanceRunning][api/instance/running/:entity-type/:entity-name?colo=:colo&lifecycle=:lifecycle&...]] | List of running instances. |
+| GET | [[InstanceList][api/instance/list/:entity-type/:entity-name?start=:start&end=:end&colo&lifecycle=:lifecycle&...]] | List of instances |
+| GET | [[InstanceStatus][api/instance/status/:entity-type/:entity-name?start=:start&end=:end&colo=:colo&lifecycle=:lifecycle&...]] | Status of a given instance |
+| POST | [[InstanceKill][api/instance/kill/:entity-type/:entity-name]] | Kill a given instance |
+| POST | [[InstanceSuspend][api/instance/suspend/:entity-type/:entity-name]] | Suspend a running instance |
+| POST | [[InstanceResume][api/instance/resume/:entity-type/:entity-name]] | Resume a given instance |
+| POST | [[InstanceRerun][api/instance/rerun/:entity-type/:entity-name]] | Rerun a given instance |
+| GET | [[InstanceLogs][api/instance/logs/:entity-type/:entity-name?start=:start&end=:end&colo=:colo&lifecycle=:lifecycle&...]] | Get logs of a given instance |
+| GET | [[InstanceSummary][api/instance/summary/:entity-type/:entity-name?start=:start&end=:end&colo=:colo&lifecycle=:lifecycle]] | Return summary of instances for an entity |
---++ REST Call on Lineage Graph
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/62b2af02/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
index 509d2fd..84ae446 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java
@@ -33,7 +33,10 @@ import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.store.EntityAlreadyExistsException;
import org.apache.falcon.entity.v0.*;
import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.feed.Feed;
+import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.resource.APIResult.Status;
+import org.apache.falcon.resource.EntityList.EntityElement;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.security.SecurityUtil;
import org.apache.falcon.util.DeploymentUtil;
@@ -41,7 +44,7 @@ import org.apache.falcon.util.RuntimeProperties;
import org.apache.falcon.workflow.WorkflowEngineFactory;
import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
import org.apache.hadoop.io.IOUtils;
-import org.datanucleus.util.StringUtils;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -196,7 +199,7 @@ public abstract class AbstractEntityManager {
* Deletes a scheduled entity, a deleted entity is removed completely from
* execution pool.
*
- * @param type entity type
+ * @param type entity type
* @param entity entity name
* @return APIResult
*/
@@ -459,55 +462,156 @@ public abstract class AbstractEntityManager {
/**
* Returns the list of entities registered of a given type.
*
- * @param type entity type
- * @param fieldStr fields that the query is interested in, separated by comma
- * @return String
+ * @param type Only return entities of this type
+ * @param fieldStr fields that the query is interested in, separated by comma
+ * @param filterBy filter by a specific field.
+ * @param offset Pagination offset
+ * @param resultsPerPage Number of results that should be returned starting at the offset
+ * @return EntityList
*/
- public EntityList getEntityList(String type, String fieldStr) {
- HashSet<String> fields = new HashSet<String>(Arrays.asList(fieldStr.split(",")));
+ public EntityList getEntityList(String type, String fieldStr, String filterBy, String filterTags,
+ String orderBy, Integer offset, Integer resultsPerPage) {
- // Currently only the status of the entity is supported
- boolean requireStatus = fields.contains("status");
+ HashSet<String> fields = new HashSet<String>(Arrays.asList(fieldStr.toLowerCase().split(",")));
+ final HashMap<String, String> filterByFieldsValues = getFilterByFieldsValues(filterBy);
+ final ArrayList<String> filterByTags = getFilterByTags(filterTags);
- try {
- EntityType entityType = EntityType.valueOf(type.toUpperCase());
- final String entityTypeString = type.toLowerCase();
- Collection<String> entityNames = configStore.getEntities(entityType);
- if (entityNames == null || entityNames.isEmpty()) {
- return new EntityList(new Entity[]{});
+ EntityType entityType = EntityType.valueOf(type.toUpperCase());
+ Collection<String> entityNames = configStore.getEntities(entityType);
+ if (entityNames == null || entityNames.isEmpty()) {
+ return new EntityList(new Entity[]{});
+ }
+
+ ArrayList<Entity> entities = new ArrayList<Entity>();
+ for (String entityName : entityNames) {
+ Entity entity;
+ try {
+ entity = configStore.get(entityType, entityName);
+ if (entity == null) {
+ continue;
+ }
+ } catch (FalconException e1) {
+ LOG.error("Unable to get list for entities for ({})", type, e1);
+ throw FalconWebException.newException(e1, Response.Status.BAD_REQUEST);
+ }
+
+ List<String> tags = getTags(entity);
+ List<String> pipelines = getPipelines(entity);
+ String entityStatus = getStatusString(entity);
+
+ if (filterEntity(entity, entityStatus,
+ filterByFieldsValues, filterByTags, tags, pipelines)) {
+ continue;
}
+ entities.add(entity);
+ }
+ // Sort entities before returning a subset of entity elements.
+ entities = sortEntities(entities, orderBy);
- int len = entityNames.size();
- EntityList.EntityElement[] elements = new EntityList.EntityElement[len];
+ int pageCount = getRequiredNumberOfResults(entities.size(), offset, resultsPerPage);
+ if (pageCount == 0) { // handle pagination
+ return new EntityList(new Entity[]{});
+ }
+
+ return new EntityList(buildEntityElements(offset, fields, entities, pageCount));
+ }
- int i = 0;
- for (String entityName : entityNames) {
- Entity e = configStore.get(entityType, entityName);
- if (SecurityUtil.isAuthorizationEnabled() && !isEntityAuthorized(e)) {
- // the user who requested list query has no permission to access this entity.
- continue; // Skip this entity
+ protected static HashMap<String, String> getFilterByFieldsValues(String filterBy) {
+ //Filter the results by specific field:value
+ HashMap<String, String> filterByFieldValues = new HashMap<String, String>();
+ if (!StringUtils.isEmpty(filterBy)) {
+ String[] fieldValueArray = filterBy.split(",");
+ for (String fieldValue : fieldValueArray) {
+ String[] splits = fieldValue.split(":", 2);
+ String filterByField = splits[0];
+ if (splits.length == 2) {
+ filterByFieldValues.put(filterByField, splits[1]);
+ } else {
+ filterByFieldValues.put(filterByField, "");
}
+ }
+ }
+ return filterByFieldValues;
+ }
- EntityList.EntityElement elem = new EntityList.EntityElement();
- elem.name = e.getName();
- elem.type = entityTypeString;
- if (requireStatus) {
- String statusString;
- try {
- EntityStatus status = getStatus(e, entityType);
- statusString = status.name();
- } catch (FalconException e1) {
- statusString = "UNKNOWN";
- }
- elem.status = statusString;
+ private static ArrayList<String> getFilterByTags(String filterTags) {
+ ArrayList<String> filterTagsList = new ArrayList<String>();
+ if (!StringUtils.isEmpty(filterTags)) {
+ String[] splits = filterTags.split(",");
+ for (String tag : splits) {
+ filterTagsList.add(tag.trim());
+ }
+ }
+ return filterTagsList;
+ }
+
+ private List<String> getTags(Entity entity) {
+ String rawTags = null;
+ switch (entity.getEntityType()) {
+ case PROCESS:
+ rawTags = ((Process) entity).getTags();
+ break;
+
+ case FEED:
+ rawTags = ((Feed) entity).getTags();
+ break;
+
+ case CLUSTER:
+ rawTags = ((Cluster) entity).getTags();
+ break;
+
+ default:
+ break;
+ }
+
+ List<String> tags = new ArrayList<String>();
+ if (!StringUtils.isEmpty(rawTags)) {
+ for(String tag : rawTags.split(",")) {
+ LOG.info("Adding tag - "+ tag);
+ tags.add(tag.trim());
+ }
+ }
+
+ return tags;
+ }
+
+ private List<String> getPipelines(Entity entity) {
+ List<String> pipelines = new ArrayList<String>();
+ if (entity.getEntityType().equals(EntityType.PROCESS)) {
+ Process process = (Process) entity;
+ String pipelineString = process.getPipelines();
+ if (pipelineString != null) {
+ for (String pipeline : pipelineString.split(",")) {
+ pipelines.add(pipeline.trim());
}
- elements[i++] = elem;
}
- return new EntityList(elements);
- } catch (Exception e) {
- LOG.error("Unable to get list for entities for ({})", type, e);
- throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
}
+ return pipelines;
+ }
+
+ private String getStatusString(Entity entity) {
+ String statusString;
+ try {
+ statusString = getStatus(entity, entity.getEntityType()).name();
+ } catch (Throwable throwable) {
+ // Unable to fetch statusString, setting it to unknown for backwards compatibility
+ statusString = "UNKNOWN";
+ }
+ return statusString;
+ }
+
+ private boolean filterEntity(Entity entity, String entityStatus,
+ HashMap<String, String> filterByFieldsValues, ArrayList<String> filterByTags,
+ List<String> tags, List<String> pipelines) {
+ if (SecurityUtil.isAuthorizationEnabled() && !isEntityAuthorized(entity)) {
+ // the user who requested list query has no permission to access this entity. Skip this entity
+ return true;
+ }
+
+ return !((filterByTags.size() == 0 || tags.size() == 0 || !filterEntityByTags(filterByTags, tags))
+ && (filterByFieldsValues.size() == 0
+ || !filterEntityByFields(entity, filterByFieldsValues, entityStatus, pipelines)));
+
}
protected boolean isEntityAuthorized(Entity entity) {
@@ -516,17 +620,158 @@ public abstract class AbstractEntityManager {
entity.getEntityType().toString(), entity.getName(), CurrentUser.getProxyUgi());
} catch (Exception e) {
LOG.error("Authorization failed for entity=" + entity.getName()
- + " for user=" + CurrentUser.getUser() , e);
+ + " for user=" + CurrentUser.getUser(), e);
return false;
}
return true;
}
+ private boolean filterEntityByTags(ArrayList<String> filterTagsList, List<String> tags) {
+ boolean filterEntity = false;
+ for (String tag : filterTagsList) {
+ if (!tags.contains(tag)) {
+ filterEntity = true;
+ break;
+ }
+ }
+
+ return filterEntity;
+ }
+
+ private boolean filterEntityByFields(Entity entity, HashMap<String, String> filterKeyVals,
+ String status, List<String> pipelines) {
+ boolean filterEntity = false;
+
+ if (filterKeyVals.size() != 0) {
+ String filterValue;
+ for (Map.Entry<String, String> pair : filterKeyVals.entrySet()) {
+ filterValue = pair.getValue();
+ if (StringUtils.isEmpty(filterValue)) {
+ continue; // nothing to filter
+ }
+ EntityList.EntityFilterByFields filter =
+ EntityList.EntityFilterByFields.valueOf(pair.getKey().toUpperCase());
+ switch (filter) {
+
+ case TYPE:
+ if (!entity.getEntityType().toString().equalsIgnoreCase(filterValue)) {
+ filterEntity = true;
+ }
+ break;
+
+ case NAME:
+ if (!entity.getName().equalsIgnoreCase(filterValue)) {
+ filterEntity = true;
+ }
+ break;
+
+ case STATUS:
+ if (!status.equalsIgnoreCase(filterValue)) {
+ filterEntity = true;
+ }
+ break;
+
+ case PIPELINES:
+ if (entity.getEntityType().equals(EntityType.PROCESS)
+ && !pipelines.contains(filterValue)) {
+ filterEntity = true;
+ }
+ break;
+
+ default:
+ break;
+ }
+ if (filterEntity) {
+ break;
+ }
+ }
+ }
+ return filterEntity;
+ }
+
+ private ArrayList<Entity> sortEntities(ArrayList<Entity> entities, String orderBy) {
+ // Sort the ArrayList using orderBy param
+ if (!StringUtils.isEmpty(orderBy)) {
+ EntityList.EntityFieldList orderByField = EntityList.EntityFieldList.valueOf(orderBy.toUpperCase());
+
+ switch (orderByField) {
+
+ case TYPE:
+ Collections.sort(entities, new Comparator<Entity>() {
+ @Override
+ public int compare(Entity e1, Entity e2) {
+ return e1.getEntityType().compareTo(e2.getEntityType());
+ }
+ });
+ break;
+
+ case NAME:
+ Collections.sort(entities, new Comparator<Entity>() {
+ @Override
+ public int compare(Entity e1, Entity e2) {
+ return e1.getName().compareTo(e2.getName());
+ }
+ });
+ break;
+
+ default:
+ break;
+ }
+ } // else no sort
+
+ return entities;
+ }
+
+ protected int getRequiredNumberOfResults(int arraySize, int offset, int numresults) {
+ /* Get a subset of elements based on offset and count. When returning subset of elements,
+ elements[offset] is included. Size 10, offset 10, return empty list.
+ Size 10, offset 5, count 3, return elements[5,6,7].
+ Size 10, offset 5, count >= 5, return elements[5,6,7,8,9]
+ When count is -1, return elements starting from elements[offset] until the end */
+
+ if (offset >= arraySize || arraySize == 0) {
+ // No elements to return
+ return 0;
+ }
+ int retLen = arraySize - offset;
+ if (retLen > numresults && numresults != -1) {
+ retLen = numresults;
+ }
+ return retLen;
+ }
+
+ private EntityElement[] buildEntityElements(Integer offset, HashSet<String> fields,
+ ArrayList<Entity> entities, int pageCount) {
+ EntityElement[] elements = new EntityElement[pageCount];
+ int elementIndex = 0;
+ for (Entity entity : entities.subList(offset, (offset + pageCount))) {
+ elements[elementIndex++] = getEntityElement(entity, fields);
+ }
+ return elements;
+ }
+
+ private EntityElement getEntityElement(Entity entity, HashSet<String> fields) {
+ EntityElement elem = new EntityElement();
+ elem.type = entity.getEntityType().toString();
+ elem.name = entity.getName();
+ if (fields.contains("status")) {
+ elem.status = getStatusString(entity);
+ }
+ if (fields.contains("pipelines")) {
+ elem.pipelines = getPipelines(entity);
+ }
+ if (fields.contains("tags")) {
+ elem.tag = getTags(entity);
+ }
+
+ return elem;
+ }
+
/**
* Returns the entity definition as an XML based on name.
*
- * @param type entity type
+ * @param type entity type
* @param entityName entity name
* @return String
*/
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/62b2af02/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
index 0df81cd..141ee9c 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java
@@ -77,8 +77,10 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
return lifeCycleValues;
}
+ //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
public InstancesResult getRunningInstances(String type, String entity,
- String colo, List<LifeCycle> lifeCycles) {
+ String colo, List<LifeCycle> lifeCycles, String filterBy,
+ String orderBy, Integer offset, Integer numResults) {
checkColo(colo);
checkType(type);
try {
@@ -86,16 +88,25 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
validateNotEmpty("entityName", entity);
AbstractWorkflowEngine wfEngine = getWorkflowEngine();
Entity entityObject = EntityUtil.getEntity(type, entity);
- return wfEngine.getRunningInstances(entityObject, lifeCycles);
+ return getInstanceResultSubset(wfEngine.getRunningInstances(entityObject, lifeCycles),
+ filterBy, orderBy, offset, numResults);
} catch (Throwable e) {
LOG.error("Failed to get running instances", e);
throw FalconWebException.newInstanceException(e, Response.Status.BAD_REQUEST);
}
}
+ //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
+ public InstancesResult getInstances(String type, String entity, String startStr, String endStr,
+ String colo, List<LifeCycle> lifeCycles,
+ String filterBy, String orderBy, Integer offset, Integer numResults) {
+ return getStatus(type, entity, startStr, endStr, colo, lifeCycles,
+ filterBy, orderBy, offset, numResults);
+ }
public InstancesResult getStatus(String type, String entity, String startStr, String endStr,
- String colo, List<LifeCycle> lifeCycles) {
+ String colo, List<LifeCycle> lifeCycles,
+ String filterBy, String orderBy, Integer offset, Integer numResults) {
checkColo(colo);
checkType(type);
try {
@@ -106,8 +117,8 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
Entity entityObject = EntityUtil.getEntity(type, entity);
// LifeCycle lifeCycleObject = EntityUtil.getLifeCycle(lifeCycle);
AbstractWorkflowEngine wfEngine = getWorkflowEngine();
- return wfEngine.getStatus(
- entityObject, start, end, lifeCycles);
+ return getInstanceResultSubset(wfEngine.getStatus(entityObject, start, end, lifeCycles),
+ filterBy, orderBy, offset, numResults);
} catch (Throwable e) {
LOG.error("Failed to get instances status", e);
throw FalconWebException
@@ -137,13 +148,14 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
public InstancesResult getLogs(String type, String entity, String startStr,
String endStr, String colo, String runId,
- List<LifeCycle> lifeCycles) {
+ List<LifeCycle> lifeCycles,
+ String filterBy, String orderBy, Integer offset, Integer numResults) {
try {
lifeCycles = checkAndUpdateLifeCycle(lifeCycles, type);
- // TODO getStatus does all validations and filters clusters
+ // getStatus does all validations and filters clusters
InstancesResult result = getStatus(type, entity, startStr, endStr,
- colo, lifeCycles);
+ colo, lifeCycles, filterBy, orderBy, offset, numResults);
LogProvider logProvider = new LogProvider();
Entity entityObject = EntityUtil.getEntity(type, entity);
for (Instance instance : result.getInstances()) {
@@ -157,6 +169,129 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager {
}
}
+ private InstancesResult getInstanceResultSubset(InstancesResult resultSet, String filterBy, String orderBy,
+ Integer offset, Integer numResults) {
+
+ ArrayList<Instance> instanceSet = new ArrayList<Instance>();
+ if (resultSet.getInstances() == null) {
+ // return the empty resultSet
+ resultSet.setInstances(new Instance[0]);
+ return resultSet;
+ }
+
+ // Filter instances
+ instanceSet = filteredInstanceSet(resultSet, instanceSet, getFilterByFieldsValues(filterBy));
+
+ int pageCount = super.getRequiredNumberOfResults(instanceSet.size(), offset, numResults);
+ if (pageCount == 0) {
+ // return empty result set
+ return new InstancesResult(resultSet.getMessage(), new Instance[0]);
+ }
+ // Sort the ArrayList using orderBy
+ instanceSet = sortInstances(instanceSet, orderBy);
+ return new InstancesResult(resultSet.getMessage(),
+ instanceSet.subList(offset, (offset+pageCount)).toArray(new Instance[pageCount]));
+ }
+
+ private ArrayList<Instance> filteredInstanceSet(InstancesResult resultSet, ArrayList<Instance> instanceSet,
+ HashMap<String, String> filterByFieldsValues) {
+
+ for (Instance instance : resultSet.getInstances()) {
+ boolean addInstance = true;
+ // If filterBy is empty, return all instances. Else return instances with matching filter.
+ if (filterByFieldsValues.size() > 0) {
+ String filterValue;
+ for (Map.Entry<String, String> pair : filterByFieldsValues.entrySet()) {
+ filterValue = pair.getValue();
+ if (filterValue.equals("")) {
+ continue;
+ }
+ try {
+ switch (InstancesResult.InstanceFilterFields.valueOf(pair.getKey().toUpperCase())) {
+ case STATUS:
+ String status = "";
+ if (instance.getStatus() != null) {
+ status = instance.getStatus().toString();
+ }
+ if (!status.equalsIgnoreCase(filterValue)) {
+ addInstance = false;
+ }
+ break;
+ case CLUSTER:
+ if (!instance.getCluster().equalsIgnoreCase(filterValue)) {
+ addInstance = false;
+ }
+ break;
+ case SOURCECLUSTER:
+ if (!instance.getSourceCluster().equalsIgnoreCase(filterValue)) {
+ addInstance = false;
+ }
+ break;
+ case STARTEDAFTER:
+ if (instance.getStartTime().before(EntityUtil.parseDateUTC(filterValue))) {
+ addInstance = false;
+ }
+ break;
+ default:
+ break;
+ }
+ } catch (Exception e) {
+ LOG.error("Invalid entry for filterBy field", e);
+ throw FalconWebException.newInstanceException(e, Response.Status.BAD_REQUEST);
+ }
+ if (!addInstance) {
+ break;
+ }
+ }
+ }
+ if (addInstance) {
+ instanceSet.add(instance);
+ }
+ }
+ return instanceSet;
+ }
+
+ private ArrayList<Instance> sortInstances(ArrayList<Instance> instanceSet, String orderBy) {
+ if (orderBy.equals("status")) {
+ Collections.sort(instanceSet, new Comparator<Instance>() {
+ @Override
+ public int compare(Instance i1, Instance i2) {
+ if (i1.getStatus() == null) {
+ i1.status = InstancesResult.WorkflowStatus.ERROR;
+ }
+ if (i2.getStatus() == null) {
+ i2.status = InstancesResult.WorkflowStatus.ERROR;
+ }
+ return i1.getStatus().name().compareTo(i2.getStatus().name());
+ }
+ });
+ } else if (orderBy.equals("cluster")) {
+ Collections.sort(instanceSet, new Comparator<Instance>() {
+ @Override
+ public int compare(Instance i1, Instance i2) {
+ return i1.getCluster().compareTo(i2.getCluster());
+ }
+ });
+ } else if (orderBy.equals("startTime")){
+ Collections.sort(instanceSet, new Comparator<Instance>() {
+ @Override
+ public int compare(Instance i1, Instance i2) {
+ return i1.getStartTime().compareTo(i2.getStartTime());
+ }
+ });
+ } else if (orderBy.equals("endTime")) {
+ Collections.sort(instanceSet, new Comparator<Instance>() {
+ @Override
+ public int compare(Instance i1, Instance i2) {
+ return i1.getEndTime().compareTo(i2.getEndTime());
+ }
+ });
+ }//Default : no sort
+ return instanceSet;
+ }
+
+ //RESUME CHECKSTYLE CHECK ParameterNumberCheck
+
public InstancesResult getInstanceParams(String type,
String entity, String startTime,
String colo, List<LifeCycle> lifeCycles) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/62b2af02/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
index 36ce6c9..970ea74 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
@@ -71,6 +71,7 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
return processInstanceManagerChannels.get(colo);
}
+ //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck
@GET
@Path("running/{type}/{entity}")
@Produces(MediaType.APPLICATION_JSON)
@@ -80,12 +81,47 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@Dimension("entityType") @PathParam("type") final String type,
@Dimension("entityName") @PathParam("entity") final String entity,
@Dimension("colo") @QueryParam("colo") String colo,
- @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles) {
+ @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles,
+ @DefaultValue("") @QueryParam("filterBy") final String filterBy,
+ @DefaultValue("") @QueryParam("orderBy") final String orderBy,
+ @DefaultValue("0") @QueryParam("offset") final Integer offset,
+ @DefaultValue("-1") @QueryParam("numResults") final Integer resultsPerPage) {
return new InstanceProxy() {
@Override
protected InstancesResult doExecute(String colo) throws FalconException {
return getInstanceManager(colo).
- invoke("getRunningInstances", type, entity, colo, lifeCycles);
+ invoke("getRunningInstances", type, entity, colo, lifeCycles,
+ filterBy, orderBy, offset, resultsPerPage);
+ }
+ }.execute(colo, type, entity);
+ }
+
+ /*
+ getStatus(...) method actually gets all instances, filtered by a specific status. This is
+ a better named API which achieves the same result
+ */
+ @GET
+ @Path("list/{type}/{entity}")
+ @Produces(MediaType.APPLICATION_JSON)
+ @Monitored(event = "instance-list")
+ @Override
+ public InstancesResult getInstances(
+ @Dimension("entityType") @PathParam("type") final String type,
+ @Dimension("entityName") @PathParam("entity") final String entity,
+ @Dimension("start-time") @QueryParam("start") final String startStr,
+ @Dimension("end-time") @QueryParam("end") final String endStr,
+ @Dimension("colo") @QueryParam("colo") final String colo,
+ @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles,
+ @DefaultValue("") @QueryParam("filterBy") final String filterBy,
+ @DefaultValue("") @QueryParam("orderBy") final String orderBy,
+ @DefaultValue("0") @QueryParam("offset") final Integer offset,
+ @DefaultValue("-1") @QueryParam("numResults") final Integer resultsPerPage) {
+ return new InstanceProxy() {
+ @Override
+ protected InstancesResult doExecute(String colo) throws FalconException {
+ return getInstanceManager(colo).invoke("getInstances",
+ type, entity, startStr, endStr, colo, lifeCycles,
+ filterBy, orderBy, offset, resultsPerPage);
}
}.execute(colo, type, entity);
}
@@ -101,12 +137,17 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@Dimension("start-time") @QueryParam("start") final String startStr,
@Dimension("end-time") @QueryParam("end") final String endStr,
@Dimension("colo") @QueryParam("colo") final String colo,
- @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles) {
+ @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles,
+ @DefaultValue("") @QueryParam("filterBy") final String filterBy,
+ @DefaultValue("") @QueryParam("orderBy") final String orderBy,
+ @DefaultValue("0") @QueryParam("offset") final Integer offset,
+ @DefaultValue("-1") @QueryParam("numResults") final Integer resultsPerPage) {
return new InstanceProxy() {
@Override
protected InstancesResult doExecute(String colo) throws FalconException {
return getInstanceManager(colo).invoke("getStatus",
- type, entity, startStr, endStr, colo, lifeCycles);
+ type, entity, startStr, endStr, colo, lifeCycles,
+ filterBy, orderBy, offset, resultsPerPage);
}
}.execute(colo, type, entity);
}
@@ -165,12 +206,17 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@Dimension("end-time") @QueryParam("end") final String endStr,
@Dimension("colo") @QueryParam("colo") final String colo,
@Dimension("run-id") @QueryParam("runid") final String runId,
- @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles) {
+ @Dimension("lifecycle") @QueryParam("lifecycle") final List<LifeCycle> lifeCycles,
+ @DefaultValue("") @QueryParam("filterBy") final String filterBy,
+ @DefaultValue("") @QueryParam("orderBy") final String orderBy,
+ @DefaultValue("0") @QueryParam("offset") final Integer offset,
+ @DefaultValue("-1") @QueryParam("numResults") final Integer resultsPerPage) {
return new InstanceProxy() {
@Override
protected InstancesResult doExecute(String colo) throws FalconException {
return getInstanceManager(colo).invoke("getLogs",
- type, entity, startStr, endStr, colo, runId, lifeCycles);
+ type, entity, startStr, endStr, colo, runId, lifeCycles,
+ filterBy, orderBy, offset, resultsPerPage);
}
}.execute(colo, type, entity);
}
@@ -269,6 +315,7 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
}
}.execute(colo, type, entity);
}
+ //RESUME CHECKSTYLE CHECK ParameterNumberCheck
private abstract class InstanceProxy {