You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@falcon.apache.org by sr...@apache.org on 2013/04/26 17:50:27 UTC
[11/47] Fixes for Checkstyle
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
index 739d655..9a41d33 100644
--- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
+++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java
@@ -18,11 +18,6 @@
package org.apache.falcon.resource;
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.Response;
-
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
import org.apache.falcon.entity.EntityUtil;
@@ -32,6 +27,11 @@ import org.apache.falcon.entity.v0.UnschedulableEntityException;
import org.apache.falcon.monitors.Dimension;
import org.apache.log4j.Logger;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.Response;
+
/**
* REST resource of allowed actions on Schedulable Entities Only Process and
* Feed can have schedulable actions
@@ -42,13 +42,15 @@ public abstract class AbstractSchedulableEntityManager extends AbstractEntityMan
/**
* Schedules an submitted entity immediately
- *
+ *
* @param type
* @param entity
* @return APIResult
*/
- public APIResult schedule(@Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type,
- @Dimension("entityName") @PathParam("entity") String entity, @Dimension("colo") @PathParam("colo") String colo) {
+ public APIResult schedule(
+ @Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type,
+ @Dimension("entityName") @PathParam("entity") String entity,
+ @Dimension("colo") @PathParam("colo") String colo) {
checkColo(colo);
try {
audit(request, entity, type, "SCHEDULED");
@@ -68,11 +70,12 @@ public abstract class AbstractSchedulableEntityManager extends AbstractEntityMan
/**
* Submits a new entity and schedules it immediately
- *
+ *
* @param type
* @return
*/
- public APIResult submitAndSchedule(@Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type,
+ public APIResult submitAndSchedule(
+ @Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type,
@Dimension("colo") @PathParam("colo") String colo) {
checkColo(colo);
try {
@@ -80,7 +83,8 @@ public abstract class AbstractSchedulableEntityManager extends AbstractEntityMan
audit(request, "STREAMED_DATA", type, "SUBMIT_AND_SCHEDULE");
Entity entity = submitInternal(request, type);
scheduleInternal(type, entity.getName());
- return new APIResult(APIResult.Status.SUCCEEDED, entity.getName() + "(" + type + ") scheduled successfully");
+ return new APIResult(APIResult.Status.SUCCEEDED,
+ entity.getName() + "(" + type + ") scheduled successfully");
} catch (Throwable e) {
LOG.error("Unable to submit and schedule ", e);
throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
@@ -89,22 +93,25 @@ public abstract class AbstractSchedulableEntityManager extends AbstractEntityMan
/**
* Suspends a running entity
- *
+ *
* @param type
* @param entity
* @return APIResult
*/
- public APIResult suspend(@Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type,
- @Dimension("entityName") @PathParam("entity") String entity, @Dimension("entityName") @PathParam("entity") String colo) {
+ public APIResult suspend(
+ @Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type,
+ @Dimension("entityName") @PathParam("entity") String entity,
+ @Dimension("entityName") @PathParam("entity") String colo) {
checkColo(colo);
try {
checkSchedulableEntity(type);
audit(request, entity, type, "SUSPEND");
Entity entityObj = EntityUtil.getEntity(type, entity);
- if (getWorkflowEngine().isActive(entityObj))
+ if (getWorkflowEngine().isActive(entityObj)) {
getWorkflowEngine().suspend(entityObj);
- else
+ } else {
throw new FalconException(entity + "(" + type + ") is not scheduled");
+ }
return new APIResult(APIResult.Status.SUCCEEDED, entity + "(" + type + ") suspended successfully");
} catch (Throwable e) {
LOG.error("Unable to suspend entity", e);
@@ -114,23 +121,26 @@ public abstract class AbstractSchedulableEntityManager extends AbstractEntityMan
/**
* Resumes a suspended entity
- *
+ *
* @param type
* @param entity
* @return APIResult
*/
- public APIResult resume(@Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type,
- @Dimension("entityName") @PathParam("entity") String entity, @Dimension("colo") @PathParam("colo") String colo) {
+ public APIResult resume(
+ @Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type,
+ @Dimension("entityName") @PathParam("entity") String entity,
+ @Dimension("colo") @PathParam("colo") String colo) {
checkColo(colo);
try {
checkSchedulableEntity(type);
audit(request, entity, type, "RESUME");
Entity entityObj = EntityUtil.getEntity(type, entity);
- if (getWorkflowEngine().isActive(entityObj))
+ if (getWorkflowEngine().isActive(entityObj)) {
getWorkflowEngine().resume(entityObj);
- else
+ } else {
throw new FalconException(entity + "(" + type + ") is not scheduled");
+ }
return new APIResult(APIResult.Status.SUCCEEDED, entity + "(" + type + ") resumed successfully");
} catch (Throwable e) {
LOG.error("Unable to resume entity", e);
@@ -141,7 +151,8 @@ public abstract class AbstractSchedulableEntityManager extends AbstractEntityMan
private void checkSchedulableEntity(String type) throws UnschedulableEntityException {
EntityType entityType = EntityType.valueOf(type.toUpperCase());
if (!entityType.isSchedulable()) {
- throw new UnschedulableEntityException("Entity type (" + type + ") " + " cannot be Scheduled/Suspended/Resumed");
+ throw new UnschedulableEntityException(
+ "Entity type (" + type + ") " + " cannot be Scheduled/Suspended/Resumed");
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/prism/src/main/java/org/apache/falcon/resource/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/channel/AbstractChannel.java b/prism/src/main/java/org/apache/falcon/resource/channel/AbstractChannel.java
index 47c70ed..7b15b96 100644
--- a/prism/src/main/java/org/apache/falcon/resource/channel/AbstractChannel.java
+++ b/prism/src/main/java/org/apache/falcon/resource/channel/AbstractChannel.java
@@ -30,7 +30,7 @@ public abstract class AbstractChannel implements Channel {
protected Method getMethod(Class service, String methodName, Object... args)
throws FalconException {
- MethodKey methodKey = new MethodKey(methodName, args);
+ MethodKey methodKey = new MethodKey(methodName, args);
Method method = methods.get(methodKey);
if (method == null) {
for (Method item : service.getDeclaredMethods()) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java b/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java
index 64d39ba..9873a96 100644
--- a/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java
+++ b/prism/src/main/java/org/apache/falcon/resource/channel/HTTPChannel.java
@@ -18,25 +18,9 @@
package org.apache.falcon.resource.channel;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Method;
-import java.util.Properties;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.HttpMethod;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status.Family;
-import javax.ws.rs.core.UriBuilder;
-
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
import org.apache.falcon.FalconException;
import org.apache.falcon.resource.proxy.BufferedRequest;
import org.apache.falcon.security.CurrentUser;
@@ -44,9 +28,15 @@ import org.apache.falcon.util.DeploymentProperties;
import org.apache.falcon.util.RuntimeProperties;
import org.apache.log4j.Logger;
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.ClientResponse;
-import com.sun.jersey.api.client.config.DefaultClientConfig;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status.Family;
+import javax.ws.rs.core.UriBuilder;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Method;
+import java.util.Properties;
public class HTTPChannel extends AbstractChannel {
private static final Logger LOG = Logger.getLogger(HTTPChannel.class);
@@ -121,8 +111,9 @@ public class HTTPChannel extends AbstractChannel {
}
private boolean isPost(String httpMethod) {
- if(httpMethod.equals("POST") || httpMethod.equals("PUT"))
+ if (httpMethod.equals("POST") || httpMethod.equals("PUT")) {
return true;
+ }
return false;
}
@@ -181,13 +172,19 @@ public class HTTPChannel extends AbstractChannel {
private String getHttpMethod(Method method) {
PUT put = method.getAnnotation(PUT.class);
- if (put != null) return HttpMethod.PUT;
+ if (put != null) {
+ return HttpMethod.PUT;
+ }
POST post = method.getAnnotation(POST.class);
- if (post != null) return HttpMethod.POST;
+ if (post != null) {
+ return HttpMethod.POST;
+ }
DELETE delete = method.getAnnotation(DELETE.class);
- if (delete != null) return HttpMethod.DELETE;
+ if (delete != null) {
+ return HttpMethod.DELETE;
+ }
return HttpMethod.GET;
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/prism/src/main/java/org/apache/falcon/resource/channel/IPCChannel.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/channel/IPCChannel.java b/prism/src/main/java/org/apache/falcon/resource/channel/IPCChannel.java
index 9691040..b4d88f9 100644
--- a/prism/src/main/java/org/apache/falcon/resource/channel/IPCChannel.java
+++ b/prism/src/main/java/org/apache/falcon/resource/channel/IPCChannel.java
@@ -18,8 +18,6 @@
package org.apache.falcon.resource.channel;
-import java.lang.reflect.Method;
-
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconRuntimException;
import org.apache.falcon.FalconWebException;
@@ -27,6 +25,8 @@ import org.apache.falcon.resource.AbstractEntityManager;
import org.apache.falcon.util.ReflectionUtils;
import org.apache.log4j.Logger;
+import java.lang.reflect.Method;
+
public class IPCChannel extends AbstractChannel {
private static final Logger LOG = Logger.getLogger(IPCChannel.class);
private AbstractEntityManager service;
@@ -46,10 +46,16 @@ public class IPCChannel extends AbstractChannel {
return (T) method.invoke(service, args);
} catch (Exception e) {
Throwable cause = e.getCause();
- if (cause != null) {
- if (cause instanceof FalconWebException) throw (FalconWebException) cause;
- if (cause instanceof FalconRuntimException) throw (FalconRuntimException) cause;
- if (cause instanceof FalconException) throw (FalconException) cause;
+ if (cause != null) {
+ if (cause instanceof FalconWebException) {
+ throw (FalconWebException) cause;
+ }
+ if (cause instanceof FalconRuntimException) {
+ throw (FalconRuntimException) cause;
+ }
+ if (cause instanceof FalconException) {
+ throw (FalconException) cause;
+ }
}
throw new FalconException("Unable to invoke on the channel " + methodName +
" on service : " + service.getClass().getName() + cause);
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/prism/src/main/java/org/apache/falcon/resource/channel/MethodKey.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/channel/MethodKey.java b/prism/src/main/java/org/apache/falcon/resource/channel/MethodKey.java
index ceb99e6..e4fddc2 100644
--- a/prism/src/main/java/org/apache/falcon/resource/channel/MethodKey.java
+++ b/prism/src/main/java/org/apache/falcon/resource/channel/MethodKey.java
@@ -42,19 +42,25 @@ public class MethodKey {
@Override
public boolean equals(Object methodRHS) {
- if (this == methodRHS) return true;
+ if (this == methodRHS) {
+ return true;
+ }
if (methodRHS == null ||
- getClass() != methodRHS.getClass()) return false;
+ getClass() != methodRHS.getClass()) {
+ return false;
+ }
MethodKey methodKey = (MethodKey) methodRHS;
if (name != null ? !name.equals(methodKey.name) :
- methodKey.name != null) return false;
+ methodKey.name != null) {
+ return false;
+ }
boolean matching = true;
for (int index = 0; index < argClasses.length; index++) {
if (argClasses[index] != null && methodKey.argClasses[index] != null &&
!methodKey.argClasses[index].isAssignableFrom(argClasses[index])) {
- matching = false;
+ matching = false;
}
}
return matching;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/prism/src/main/java/org/apache/falcon/resource/provider/JAXBContextResolver.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/provider/JAXBContextResolver.java b/prism/src/main/java/org/apache/falcon/resource/provider/JAXBContextResolver.java
index 5e6564e..11d0882 100644
--- a/prism/src/main/java/org/apache/falcon/resource/provider/JAXBContextResolver.java
+++ b/prism/src/main/java/org/apache/falcon/resource/provider/JAXBContextResolver.java
@@ -18,6 +18,11 @@
package org.apache.falcon.resource.provider;
+import com.sun.jersey.api.json.JSONConfiguration;
+import com.sun.jersey.api.json.JSONJAXBContext;
+import org.apache.falcon.resource.APIResult;
+import org.apache.falcon.resource.InstancesResult;
+
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.ext.ContextResolver;
@@ -25,17 +30,13 @@ import javax.ws.rs.ext.Provider;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
-import com.sun.jersey.api.json.JSONConfiguration;
-import com.sun.jersey.api.json.JSONJAXBContext;
-import org.apache.falcon.resource.APIResult;
-import org.apache.falcon.resource.InstancesResult;
-
@Provider
@Produces(MediaType.APPLICATION_JSON)
-public class JAXBContextResolver implements ContextResolver<JAXBContext>{
+public class JAXBContextResolver implements ContextResolver<JAXBContext> {
private static JAXBContext context;
- private static Class<?>[] types = { InstancesResult.class, APIResult.class, InstancesResult.Instance.class,
- InstancesResult.WorkflowStatus.class };
+ private static Class<?>[] types = {InstancesResult.class, APIResult.class, InstancesResult.Instance.class,
+ InstancesResult.WorkflowStatus.class};
+
static {
try {
context = new JSONJAXBContext(JSONConfiguration.natural().build(), types);
@@ -44,8 +45,9 @@ public class JAXBContextResolver implements ContextResolver<JAXBContext>{
}
}
- public JAXBContextResolver() { }
-
+ public JAXBContextResolver() {
+ }
+
public JAXBContext getContext(Class<?> objectType) {
for (Class<?> type : types) {
if (type == objectType) {
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/prism/src/main/java/org/apache/falcon/resource/proxy/BufferedRequest.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/BufferedRequest.java b/prism/src/main/java/org/apache/falcon/resource/proxy/BufferedRequest.java
index 4d1f315..e797fb6 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/BufferedRequest.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/BufferedRequest.java
@@ -18,8 +18,8 @@
package org.apache.falcon.resource.proxy;
-import org.apache.hadoop.io.IOUtils;
import org.apache.falcon.FalconWebException;
+import org.apache.hadoop.io.IOUtils;
import javax.servlet.RequestDispatcher;
import javax.servlet.ServletInputStream;
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
index 80a0cad..c637984 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/InstanceManagerProxy.java
@@ -18,23 +18,6 @@
package org.apache.falcon.resource.proxy;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconRuntimException;
import org.apache.falcon.FalconWebException;
@@ -47,6 +30,13 @@ import org.apache.falcon.resource.InstancesResult.Instance;
import org.apache.falcon.resource.channel.Channel;
import org.apache.falcon.resource.channel.ChannelFactory;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.*;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.util.*;
+
@Path("instance")
public class InstanceManagerProxy extends AbstractInstanceManager {
@@ -78,11 +68,11 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@GET
@Path("running/{type}/{entity}")
@Produces(MediaType.APPLICATION_JSON)
- @Monitored(event="running")
+ @Monitored(event = "running")
@Override
public InstancesResult getRunningInstances(@Dimension("entityType") @PathParam("type") final String type,
- @Dimension("entityName") @PathParam("entity") final String entity,
- @Dimension("colo") @QueryParam("colo") String colo) {
+ @Dimension("entityName") @PathParam("entity") final String entity,
+ @Dimension("colo") @QueryParam("colo") String colo) {
return new InstanceProxy() {
@Override
protected InstancesResult doExecute(String colo) throws FalconException {
@@ -95,13 +85,13 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@GET
@Path("status/{type}/{entity}")
@Produces(MediaType.APPLICATION_JSON)
- @Monitored(event="instance-status")
+ @Monitored(event = "instance-status")
@Override
public InstancesResult getStatus(@Dimension("entityType") @PathParam("type") final String type,
- @Dimension("entityName") @PathParam("entity") final String entity,
- @Dimension("start-time") @QueryParam("start") final String startStr,
- @Dimension("end-time") @QueryParam("end") final String endStr,
- @Dimension("colo") @QueryParam("colo") final String colo) {
+ @Dimension("entityName") @PathParam("entity") final String entity,
+ @Dimension("start-time") @QueryParam("start") final String startStr,
+ @Dimension("end-time") @QueryParam("end") final String endStr,
+ @Dimension("colo") @QueryParam("colo") final String colo) {
return new InstanceProxy() {
@Override
protected InstancesResult doExecute(String colo) throws FalconException {
@@ -110,19 +100,19 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
}
}.execute(colo, type, entity);
}
-
- @GET
- @Path("logs/{type}/{entity}")
- @Produces(MediaType.APPLICATION_JSON)
- @Monitored(event = "instance-logs")
- @Override
- public InstancesResult getLogs(
- @Dimension("type") @PathParam("type") final String type,
- @Dimension("entity") @PathParam("entity") final String entity,
- @Dimension("start-time") @QueryParam("start") final String startStr,
- @Dimension("end-time") @QueryParam("end") final String endStr,
- @Dimension("colo") @QueryParam("colo") final String colo,
- @Dimension("run-id") @QueryParam("runid") final String runId) {
+
+ @GET
+ @Path("logs/{type}/{entity}")
+ @Produces(MediaType.APPLICATION_JSON)
+ @Monitored(event = "instance-logs")
+ @Override
+ public InstancesResult getLogs(
+ @Dimension("type") @PathParam("type") final String type,
+ @Dimension("entity") @PathParam("entity") final String entity,
+ @Dimension("start-time") @QueryParam("start") final String startStr,
+ @Dimension("end-time") @QueryParam("end") final String endStr,
+ @Dimension("colo") @QueryParam("colo") final String colo,
+ @Dimension("run-id") @QueryParam("runid") final String runId) {
return new InstanceProxy() {
@Override
protected InstancesResult doExecute(String colo) throws FalconException {
@@ -135,7 +125,7 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@POST
@Path("kill/{type}/{entity}")
@Produces(MediaType.APPLICATION_JSON)
- @Monitored(event="kill-instance")
+ @Monitored(event = "kill-instance")
@Override
public InstancesResult killInstance(@Context HttpServletRequest request,
@Dimension("entityType") @PathParam("type") final String type,
@@ -149,7 +139,7 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@Override
protected InstancesResult doExecute(String colo) throws FalconException {
return getInstanceManager(colo).invoke("killInstance",
- bufferedRequest, type, entity, startStr, endStr, colo);
+ bufferedRequest, type, entity, startStr, endStr, colo);
}
}.execute(colo, type, entity);
}
@@ -157,7 +147,7 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@POST
@Path("suspend/{type}/{entity}")
@Produces(MediaType.APPLICATION_JSON)
- @Monitored(event="suspend-instance")
+ @Monitored(event = "suspend-instance")
@Override
public InstancesResult suspendInstance(@Context HttpServletRequest request,
@Dimension("entityType") @PathParam("type") final String type,
@@ -170,7 +160,7 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@Override
protected InstancesResult doExecute(String colo) throws FalconException {
return getInstanceManager(colo).invoke("suspendInstance",
- bufferedRequest, type, entity, startStr, endStr, colo);
+ bufferedRequest, type, entity, startStr, endStr, colo);
}
}.execute(colo, type, entity);
}
@@ -178,7 +168,7 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@POST
@Path("resume/{type}/{entity}")
@Produces(MediaType.APPLICATION_JSON)
- @Monitored(event="resume-instance")
+ @Monitored(event = "resume-instance")
@Override
public InstancesResult resumeInstance(@Context HttpServletRequest request,
@Dimension("entityType") @PathParam("type") final String type,
@@ -192,7 +182,7 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@Override
protected InstancesResult doExecute(String colo) throws FalconException {
return getInstanceManager(colo).invoke("resumeInstance",
- bufferedRequest, type, entity, startStr, endStr, colo);
+ bufferedRequest, type, entity, startStr, endStr, colo);
}
}.execute(colo, type, entity);
}
@@ -200,21 +190,21 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
@POST
@Path("rerun/{type}/{entity}")
@Produces(MediaType.APPLICATION_JSON)
- @Monitored(event="re-run-instance")
+ @Monitored(event = "re-run-instance")
@Override
public InstancesResult reRunInstance(@Dimension("entityType") @PathParam("type") final String type,
- @Dimension("entityName") @PathParam("entity") final String entity,
- @Dimension("start-time") @QueryParam("start") final String startStr,
- @Dimension("end-time") @QueryParam("end") final String endStr,
- @Context HttpServletRequest request,
- @Dimension("colo") @QueryParam("colo") String colo) {
+ @Dimension("entityName") @PathParam("entity") final String entity,
+ @Dimension("start-time") @QueryParam("start") final String startStr,
+ @Dimension("end-time") @QueryParam("end") final String endStr,
+ @Context HttpServletRequest request,
+ @Dimension("colo") @QueryParam("colo") String colo) {
final HttpServletRequest bufferedRequest = new BufferedRequest(request);
return new InstanceProxy() {
@Override
protected InstancesResult doExecute(String colo) throws FalconException {
return getInstanceManager(colo).invoke("reRunInstance",
- type, entity, startStr, endStr, bufferedRequest, colo);
+ type, entity, startStr, endStr, bufferedRequest, colo);
}
}.execute(colo, type, entity);
}
@@ -225,7 +215,7 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
Set<String> colos = getColosFromExpression(coloExpr, type, name);
Map<String, InstancesResult> results = new HashMap<String, InstancesResult>();
- for (String colo:colos) {
+ for (String colo : colos) {
try {
APIResult resultHolder = doExecute(colo);
if (resultHolder instanceof InstancesResult) {
@@ -249,26 +239,29 @@ public class InstanceManagerProxy extends AbstractInstanceManager {
protected abstract InstancesResult doExecute(String colo) throws FalconException;
}
-
+
private InstancesResult consolidateInstanceResult(Map<String, InstancesResult> results) {
- if (results == null || results.isEmpty())
+ if (results == null || results.isEmpty()) {
return null;
+ }
StringBuilder message = new StringBuilder();
StringBuilder requestIds = new StringBuilder();
List<Instance> instances = new ArrayList<Instance>();
int statusCount = 0;
- for (String colo:results.keySet()) {
+ for (String colo : results.keySet()) {
InstancesResult result = results.get(colo);
message.append(colo).append('/').append(result.getMessage()).append('\n');
requestIds.append(colo).append('/').append(result.getRequestId()).append('\n');
statusCount += result.getStatus().ordinal();
- if (result.getInstances() == null) continue;
+ if (result.getInstances() == null) {
+ continue;
+ }
for (Instance instance : result.getInstances()) {
- instance.instance = instance.getInstance();
- instances.add(instance);
+ instance.instance = instance.getInstance();
+ instances.add(instance);
}
}
Instance[] arrInstances = new Instance[instances.size()];
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
index 418b7b7..aa1ef6d 100644
--- a/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
+++ b/prism/src/main/java/org/apache/falcon/resource/proxy/SchedulableEntityManagerProxy.java
@@ -18,25 +18,6 @@
package org.apache.falcon.resource.proxy;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconRuntimException;
import org.apache.falcon.FalconWebException;
@@ -53,6 +34,17 @@ import org.apache.falcon.resource.channel.Channel;
import org.apache.falcon.resource.channel.ChannelFactory;
import org.apache.falcon.util.DeploymentUtil;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.*;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
@Path("entities")
public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityManager {
private static final String PRISM_TAG = "prism";
@@ -69,7 +61,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
for (String colo : colos) {
initializeFor(colo);
}
-
+
DeploymentUtil.setPrismMode();
} catch (FalconException e) {
throw new FalconRuntimException("Unable to initialize channels", e);
@@ -96,18 +88,20 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
}
private BufferedRequest getBufferedRequest(HttpServletRequest request) {
- if (request instanceof BufferedRequest)
+ if (request instanceof BufferedRequest) {
return (BufferedRequest) request;
+ }
return new BufferedRequest(request);
}
@POST
@Path("submit/{type}")
- @Consumes({ MediaType.TEXT_XML, MediaType.TEXT_PLAIN })
- @Produces({ MediaType.TEXT_XML, MediaType.TEXT_PLAIN })
+ @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+ @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
@Monitored(event = "submit")
@Override
- public APIResult submit(@Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") final String type,
+ public APIResult submit(
+ @Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") final String type,
@Dimension("colo") @QueryParam("colo") final String ignore) {
final HttpServletRequest bufferedRequest = getBufferedRequest(request);
@@ -137,8 +131,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
@POST
@Path("validate/{type}")
- @Consumes({ MediaType.TEXT_XML, MediaType.TEXT_PLAIN })
- @Produces({ MediaType.TEXT_XML, MediaType.TEXT_PLAIN })
+ @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+ @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
@Override
public APIResult validate(@Context HttpServletRequest request, @PathParam("type") String type) {
return super.validate(request, type);
@@ -146,15 +140,17 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
@DELETE
@Path("delete/{type}/{entity}")
- @Produces({ MediaType.TEXT_XML, MediaType.TEXT_PLAIN })
+ @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
@Monitored(event = "delete")
@Override
- public APIResult delete(@Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") final String type,
- @Dimension("entityName") @PathParam("entity") final String entity, @Dimension("colo") @QueryParam("colo") String ignore) {
+ public APIResult delete(
+ @Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") final String type,
+ @Dimension("entityName") @PathParam("entity") final String entity,
+ @Dimension("colo") @QueryParam("colo") String ignore) {
final HttpServletRequest bufferedRequest = new BufferedRequest(request);
Map<String, APIResult> results = new HashMap<String, APIResult>();
-
+
results.put("falcon", new EntityProxy(type, entity) {
@Override
public APIResult execute() {
@@ -167,7 +163,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
throw FalconWebException.newException(e, Response.Status.BAD_REQUEST);
}
}
-
+
@Override
protected APIResult doExecute(String colo) throws FalconException {
return getConfigSyncChannel(colo).invoke("delete", bufferedRequest, type, entity, colo);
@@ -175,17 +171,18 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
}.execute());
if (!embeddedMode) {
- results.put(PRISM_TAG, super.delete(bufferedRequest, type, entity, currentColo));
+ results.put(PRISM_TAG, super.delete(bufferedRequest, type, entity, currentColo));
}
return consolidateResult(results);
}
@POST
@Path("update/{type}/{entity}")
- @Produces({ MediaType.TEXT_XML, MediaType.TEXT_PLAIN })
+ @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
@Monitored(event = "update")
@Override
- public APIResult update(@Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") final String type,
+ public APIResult update(
+ @Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") final String type,
@Dimension("entityName") @PathParam("entity") final String entityName,
@Dimension("colo") @QueryParam("colo") String ignore) {
@@ -199,13 +196,13 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
oldColos.removeAll(mergedColos); //Old colos where delete should be called
Map<String, APIResult> results = new HashMap<String, APIResult>();
- if(!oldColos.isEmpty()) {
+ if (!oldColos.isEmpty()) {
results.put("delete", new EntityProxy(type, entityName) {
@Override
protected Set<String> getColosToApply() {
return oldColos;
}
-
+
@Override
protected APIResult doExecute(String colo) throws FalconException {
return getConfigSyncChannel(colo).invoke("delete", bufferedRequest, type, entityName, colo);
@@ -213,13 +210,13 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
}.execute());
}
- if(!mergedColos.isEmpty()) {
+ if (!mergedColos.isEmpty()) {
results.put("update", new EntityProxy(type, entityName) {
@Override
protected Set<String> getColosToApply() {
return mergedColos;
}
-
+
@Override
protected APIResult doExecute(String colo) throws FalconException {
return getConfigSyncChannel(colo).invoke("update", bufferedRequest, type, entityName, colo);
@@ -227,35 +224,35 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
}.execute());
}
- if(!newColos.isEmpty()) {
+ if (!newColos.isEmpty()) {
results.put("submit", new EntityProxy(type, entityName) {
@Override
protected Set<String> getColosToApply() {
return newColos;
}
-
+
@Override
protected APIResult doExecute(String colo) throws FalconException {
return getConfigSyncChannel(colo).invoke("submit", bufferedRequest, type, colo);
}
}.execute());
}
-
+
if (!embeddedMode) {
results.put(PRISM_TAG, super.update(bufferedRequest, type, entityName, currentColo));
}
-
+
return consolidateResult(results);
}
@GET
@Path("status/{type}/{entity}")
- @Produces({ MediaType.TEXT_XML, MediaType.TEXT_PLAIN })
+ @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
@Monitored(event = "status")
@Override
public APIResult getStatus(@Dimension("entityType") @PathParam("type") final String type,
- @Dimension("entityName") @PathParam("entity") final String entity,
- @Dimension("colo") @QueryParam("colo") final String coloExpr) throws FalconWebException {
+ @Dimension("entityName") @PathParam("entity") final String entity,
+ @Dimension("colo") @QueryParam("colo") final String coloExpr) throws FalconWebException {
return new EntityProxy(type, entity) {
@Override
protected Set<String> getColosToApply() {
@@ -275,7 +272,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
@Monitored(event = "dependencies")
@Override
public EntityList getDependencies(@Dimension("entityType") @PathParam("type") String type,
- @Dimension("entityName") @PathParam("entity") String entity) {
+ @Dimension("entityName") @PathParam("entity") String entity) {
return super.getDependencies(type, entity);
}
@@ -289,7 +286,7 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
@GET
@Path("definition/{type}/{entity}")
- @Produces({ MediaType.TEXT_XML, MediaType.TEXT_PLAIN })
+ @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
@Override
public String getEntityDefinition(@PathParam("type") String type, @PathParam("entity") String entityName) {
return super.getEntityDefinition(type, entityName);
@@ -297,13 +294,13 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
@POST
@Path("schedule/{type}/{entity}")
- @Produces({ MediaType.TEXT_XML, MediaType.TEXT_PLAIN })
+ @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
@Monitored(event = "schedule")
@Override
public APIResult schedule(@Context final HttpServletRequest request,
- @Dimension("entityType") @PathParam("type") final String type,
- @Dimension("entityName") @PathParam("entity") final String entity,
- @Dimension("colo") @QueryParam("colo") final String coloExpr) {
+ @Dimension("entityType") @PathParam("type") final String type,
+ @Dimension("entityName") @PathParam("entity") final String entity,
+ @Dimension("colo") @QueryParam("colo") final String coloExpr) {
final HttpServletRequest bufferedRequest = getBufferedRequest(request);
return new EntityProxy(type, entity) {
@@ -321,11 +318,12 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
@POST
@Path("submitAndSchedule/{type}")
- @Consumes({ MediaType.TEXT_XML, MediaType.TEXT_PLAIN })
- @Produces({ MediaType.TEXT_XML, MediaType.TEXT_PLAIN })
+ @Consumes({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
+ @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
@Monitored(event = "submitAndSchedule")
@Override
- public APIResult submitAndSchedule(@Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type,
+ public APIResult submitAndSchedule(
+ @Context HttpServletRequest request, @Dimension("entityType") @PathParam("type") String type,
@Dimension("colo") @QueryParam("colo") String coloExpr) {
BufferedRequest bufferedRequest = new BufferedRequest(request);
String entity = getEntity(bufferedRequest, type).getName();
@@ -337,13 +335,13 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
@POST
@Path("suspend/{type}/{entity}")
- @Produces({ MediaType.TEXT_XML, MediaType.TEXT_PLAIN })
+ @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
@Monitored(event = "suspend")
@Override
public APIResult suspend(@Context final HttpServletRequest request,
- @Dimension("entityType") @PathParam("type") final String type,
- @Dimension("entityName") @PathParam("entity") final String entity,
- @Dimension("colo") @QueryParam("colo") final String coloExpr) {
+ @Dimension("entityType") @PathParam("type") final String type,
+ @Dimension("entityName") @PathParam("entity") final String entity,
+ @Dimension("colo") @QueryParam("colo") final String coloExpr) {
final HttpServletRequest bufferedRequest = new BufferedRequest(request);
return new EntityProxy(type, entity) {
@@ -361,10 +359,11 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
@POST
@Path("resume/{type}/{entity}")
- @Produces({ MediaType.TEXT_XML, MediaType.TEXT_PLAIN })
+ @Produces({MediaType.TEXT_XML, MediaType.TEXT_PLAIN})
@Monitored(event = "resume")
@Override
- public APIResult resume(@Context final HttpServletRequest request, @Dimension("entityType") @PathParam("type") final String type,
+ public APIResult resume(
+ @Context final HttpServletRequest request, @Dimension("entityType") @PathParam("type") final String type,
@Dimension("entityName") @PathParam("entity") final String entity,
@Dimension("colo") @QueryParam("colo") final String coloExpr) {
@@ -400,7 +399,8 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
try {
results.put(colo, doExecute(colo));
} catch (FalconException e) {
- results.put(colo, new APIResult(APIResult.Status.FAILED, e.getClass().getName() + "::" + e.getMessage()));
+ results.put(colo,
+ new APIResult(APIResult.Status.FAILED, e.getClass().getName() + "::" + e.getMessage()));
}
}
APIResult finalResult = consolidateResult(results);
@@ -417,10 +417,11 @@ public class SchedulableEntityManagerProxy extends AbstractSchedulableEntityMana
protected abstract APIResult doExecute(String colo) throws FalconException;
}
-
+
private APIResult consolidateResult(Map<String, APIResult> results) {
- if (results == null || results.size() == 0)
+ if (results == null || results.size() == 0) {
return null;
+ }
StringBuilder buffer = new StringBuilder();
StringBuilder requestIds = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/prism/src/main/java/org/apache/falcon/security/BasicAuthFilter.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/security/BasicAuthFilter.java b/prism/src/main/java/org/apache/falcon/security/BasicAuthFilter.java
index 1ef12c0..e3aedb1 100644
--- a/prism/src/main/java/org/apache/falcon/security/BasicAuthFilter.java
+++ b/prism/src/main/java/org/apache/falcon/security/BasicAuthFilter.java
@@ -37,9 +37,9 @@ public class BasicAuthFilter implements Filter {
private static final Logger LOG = Logger.getLogger(BasicAuthFilter.class);
private static final String GUEST = "guest";
-
- private static final Set<String> BLACK_LISTED_USER = new HashSet<String>(
- Arrays.asList(new String[] { "hdfs", "mapred", "oozie", "falcon" }));
+
+ private static final Set<String> BLACK_LISTED_USER = new HashSet<String>(
+ Arrays.asList(new String[]{"hdfs", "mapred", "oozie", "falcon"}));
private boolean secure;
@@ -69,7 +69,7 @@ public class BasicAuthFilter implements Filter {
String user;
String requestId = UUID.randomUUID().toString();
-
+
if (!secure) {
user = GUEST;
} else {
@@ -77,13 +77,12 @@ public class BasicAuthFilter implements Filter {
}
if (user == null || user.isEmpty()) {
- httpResponse.sendError(Response.Status.BAD_REQUEST.getStatusCode(),
- "Remote user header can't be empty");
- } else if(BLACK_LISTED_USER.contains(user)){
- httpResponse.sendError(Response.Status.BAD_REQUEST.getStatusCode(),
- "Remote user header can't be superusers:"+BLACK_LISTED_USER);
- }
- else {
+ httpResponse.sendError(Response.Status.BAD_REQUEST.getStatusCode(),
+ "Remote user header can't be empty");
+ } else if (BLACK_LISTED_USER.contains(user)) {
+ httpResponse.sendError(Response.Status.BAD_REQUEST.getStatusCode(),
+ "Remote user header can't be superusers:" + BLACK_LISTED_USER);
+ } else {
CurrentUser.authenticate(user);
try {
NDC.push(user + ":" + httpRequest.getMethod() + "/" + httpRequest.getPathInfo());
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java b/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
index 45561d7..4e48488 100644
--- a/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
+++ b/prism/src/main/java/org/apache/falcon/service/FalconTopicSubscriber.java
@@ -35,99 +35,99 @@ import java.lang.reflect.InvocationTargetException;
import java.util.Date;
public class FalconTopicSubscriber implements MessageListener, ExceptionListener {
- private static final Logger LOG = Logger
- .getLogger(FalconTopicSubscriber.class);
-
- private TopicSubscriber subscriber;
- private String implementation;
- private String userName;
- private String password;
- private String url;
- private String topicName;
- private Connection connection;
-
- private AbstractRerunHandler retryHandler = RerunHandlerFactory
- .getRerunHandler(RerunType.RETRY);
- private AbstractRerunHandler latedataHandler = RerunHandlerFactory
- .getRerunHandler(RerunType.LATE);
-
- public FalconTopicSubscriber(String implementation, String userName,
- String password, String url, String topicName) {
- this.implementation = implementation;
- this.userName = userName;
- this.password = password;
- this.url = url;
- this.topicName = topicName;
- }
-
- public void startSubscriber() throws FalconException {
- try {
- connection = createAndGetConnection(implementation, userName,
- password, url);
- TopicSession session = (TopicSession) connection.createSession(
- false, Session.AUTO_ACKNOWLEDGE);
- Topic destination = session.createTopic(topicName);
- subscriber = session.createSubscriber(destination);
- subscriber.setMessageListener(this);
- connection.setExceptionListener(this);
- connection.start();
- } catch (Exception e) {
- LOG.error("Error starting subscriber of topic: " + this.toString(),
- e);
- throw new FalconException(e);
- }
- }
-
- @Override
- public void onMessage(Message message) {
- MapMessage mapMessage = (MapMessage) message;
- try {
- debug(mapMessage);
- String cluster = mapMessage.getString(ARG.cluster.getArgName());
- String entityName = mapMessage.getString(ARG.entityName
- .getArgName());
- String entityType = mapMessage.getString(ARG.entityType
- .getArgName());
- String workflowId = mapMessage.getString(ARG.workflowId
- .getArgName());
- String runId = mapMessage.getString(ARG.runId.getArgName());
- String nominalTime = mapMessage.getString(ARG.nominalTime
- .getArgName());
- String status = mapMessage.getString(ARG.status.getArgName());
- String operation = mapMessage.getString(ARG.operation.getArgName());
-
- AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine();
- InstancesResult result = wfEngine
- .getJobDetails(cluster, workflowId);
+ private static final Logger LOG = Logger
+ .getLogger(FalconTopicSubscriber.class);
+
+ private TopicSubscriber subscriber;
+ private String implementation;
+ private String userName;
+ private String password;
+ private String url;
+ private String topicName;
+ private Connection connection;
+
+ private AbstractRerunHandler retryHandler = RerunHandlerFactory
+ .getRerunHandler(RerunType.RETRY);
+ private AbstractRerunHandler latedataHandler = RerunHandlerFactory
+ .getRerunHandler(RerunType.LATE);
+
+ public FalconTopicSubscriber(String implementation, String userName,
+ String password, String url, String topicName) {
+ this.implementation = implementation;
+ this.userName = userName;
+ this.password = password;
+ this.url = url;
+ this.topicName = topicName;
+ }
+
+ public void startSubscriber() throws FalconException {
+ try {
+ connection = createAndGetConnection(implementation, userName,
+ password, url);
+ TopicSession session = (TopicSession) connection.createSession(
+ false, Session.AUTO_ACKNOWLEDGE);
+ Topic destination = session.createTopic(topicName);
+ subscriber = session.createSubscriber(destination);
+ subscriber.setMessageListener(this);
+ connection.setExceptionListener(this);
+ connection.start();
+ } catch (Exception e) {
+ LOG.error("Error starting subscriber of topic: " + this.toString(),
+ e);
+ throw new FalconException(e);
+ }
+ }
+
+ @Override
+ public void onMessage(Message message) {
+ MapMessage mapMessage = (MapMessage) message;
+ try {
+ debug(mapMessage);
+ String cluster = mapMessage.getString(ARG.cluster.getArgName());
+ String entityName = mapMessage.getString(ARG.entityName
+ .getArgName());
+ String entityType = mapMessage.getString(ARG.entityType
+ .getArgName());
+ String workflowId = mapMessage.getString(ARG.workflowId
+ .getArgName());
+ String runId = mapMessage.getString(ARG.runId.getArgName());
+ String nominalTime = mapMessage.getString(ARG.nominalTime
+ .getArgName());
+ String status = mapMessage.getString(ARG.status.getArgName());
+ String operation = mapMessage.getString(ARG.operation.getArgName());
+
+ AbstractWorkflowEngine wfEngine = WorkflowEngineFactory.getWorkflowEngine();
+ InstancesResult result = wfEngine
+ .getJobDetails(cluster, workflowId);
Date startTime = result.getInstances()[0].startTime;
Date endTime = result.getInstances()[0].endTime;
Long duration = (endTime.getTime() - startTime.getTime()) * 1000000;
- if (status.equalsIgnoreCase("FAILED")) {
- retryHandler.handleRerun(cluster, entityType, entityName,
- nominalTime, runId, workflowId,
- System.currentTimeMillis());
- GenericAlert.instrumentFailedInstance(cluster, entityType,
- entityName, nominalTime, workflowId, runId, operation,
+ if (status.equalsIgnoreCase("FAILED")) {
+ retryHandler.handleRerun(cluster, entityType, entityName,
+ nominalTime, runId, workflowId,
+ System.currentTimeMillis());
+ GenericAlert.instrumentFailedInstance(cluster, entityType,
+ entityName, nominalTime, workflowId, runId, operation,
SchemaHelper.formatDateUTC(startTime),
"", "", duration);
- } else if (status.equalsIgnoreCase("SUCCEEDED")) {
- latedataHandler.handleRerun(cluster, entityType, entityName,
- nominalTime, runId, workflowId,
- System.currentTimeMillis());
- GenericAlert.instrumentSucceededInstance(cluster, entityType,
+ } else if (status.equalsIgnoreCase("SUCCEEDED")) {
+ latedataHandler.handleRerun(cluster, entityType, entityName,
+ nominalTime, runId, workflowId,
+ System.currentTimeMillis());
+ GenericAlert.instrumentSucceededInstance(cluster, entityType,
entityName, nominalTime, workflowId, runId, operation,
SchemaHelper.formatDateUTC(startTime),
duration);
notifySLAService(cluster, entityName, entityType, nominalTime, duration);
}
- } catch (Exception ignore) {
- LOG.info(
- "Error in onMessage for subscriber of topic: "
- + this.toString(), ignore);
- }
+ } catch (Exception ignore) {
+ LOG.info(
+ "Error in onMessage for subscriber of topic: "
+ + this.toString(), ignore);
+ }
- }
+ }
private void notifySLAService(String cluster, String entityName,
String entityType, String nominalTime, Long duration) {
@@ -157,45 +157,49 @@ public class FalconTopicSubscriber implements MessageListener, ExceptionListener
}
}
- @Override
- public void onException(JMSException ignore) {
- LOG.info(
- "Error in onException for subscriber of topic: "
- + this.toString(), ignore);
- }
-
- public void closeSubscriber() throws FalconException {
- try {
- LOG.info("Closing subscriber on topic : " + this.topicName);
- subscriber.close();
- connection.close();
- } catch (JMSException e) {
- LOG.error("Error closing subscriber of topic: " + this.toString(),
- e);
- throw new FalconException(e);
- }
- }
-
- private static Connection createAndGetConnection(String implementation,
- String userName, String password, String url) throws JMSException,
- ClassNotFoundException, IllegalArgumentException,
- SecurityException, InstantiationException, IllegalAccessException,
- InvocationTargetException, NoSuchMethodException {
-
- @SuppressWarnings("unchecked")
- Class<ConnectionFactory> clazz = (Class<ConnectionFactory>) FalconTopicSubscriber.class
- .getClassLoader().loadClass(implementation);
-
- ConnectionFactory connectionFactory = clazz.getConstructor(
- String.class, String.class, String.class).newInstance(userName,
- password, url);
+ @Override
+ public void onException(JMSException ignore) {
+ LOG.info(
+ "Error in onException for subscriber of topic: "
+ + this.toString(), ignore);
+ }
+
+ public void closeSubscriber() throws FalconException {
+ try {
+ LOG.info("Closing subscriber on topic : " + this.topicName);
+ subscriber.close();
+ connection.close();
+ } catch (JMSException e) {
+ LOG.error("Error closing subscriber of topic: " + this.toString(),
+ e);
+ throw new FalconException(e);
+ }
+ }
+
+ private static Connection createAndGetConnection(String implementation,
+ String userName, String password, String url) throws JMSException,
+ ClassNotFoundException,
+ IllegalArgumentException,
+ SecurityException,
+ InstantiationException,
+ IllegalAccessException,
+ InvocationTargetException,
+ NoSuchMethodException {
+
+ @SuppressWarnings("unchecked")
+ Class<ConnectionFactory> clazz = (Class<ConnectionFactory>) FalconTopicSubscriber.class
+ .getClassLoader().loadClass(implementation);
+
+ ConnectionFactory connectionFactory = clazz.getConstructor(
+ String.class, String.class, String.class).newInstance(userName,
+ password, url);
return connectionFactory.createConnection();
- }
+ }
- @Override
- public String toString() {
- return topicName;
- }
+ @Override
+ public String toString() {
+ return topicName;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/prism/src/main/java/org/apache/falcon/service/ProcessSubscriberService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/ProcessSubscriberService.java b/prism/src/main/java/org/apache/falcon/service/ProcessSubscriberService.java
index 5f7541e..9d5e78f 100644
--- a/prism/src/main/java/org/apache/falcon/service/ProcessSubscriberService.java
+++ b/prism/src/main/java/org/apache/falcon/service/ProcessSubscriberService.java
@@ -24,44 +24,44 @@ public class ProcessSubscriberService implements FalconService {
private FalconTopicSubscriber subscriber;
- private static enum JMSprops {
- FalconBrokerImplClass("broker.impl.class", "org.apache.activemq.ActiveMQConnectionFactory"),
- FalconBrokerUrl("broker.url", "tcp://localhost:61616?daemon=true"),
- FalconEntityTopic("entity.topic", "FALCON.ENTITY.TOPIC");
+ private static enum JMSprops {
+ FalconBrokerImplClass("broker.impl.class", "org.apache.activemq.ActiveMQConnectionFactory"),
+ FalconBrokerUrl("broker.url", "tcp://localhost:61616?daemon=true"),
+ FalconEntityTopic("entity.topic", "FALCON.ENTITY.TOPIC");
- private String propName;
- private String defaultPropValue;
+ private String propName;
+ private String defaultPropValue;
- private JMSprops(String propName, String defaultPropValue) {
- this.propName = propName;
- this.defaultPropValue = defaultPropValue;
- }
+ private JMSprops(String propName, String defaultPropValue) {
+ this.propName = propName;
+ this.defaultPropValue = defaultPropValue;
+ }
- }
+ }
- @Override
- public String getName() {
- return ProcessSubscriberService.class.getSimpleName();
- }
+ @Override
+ public String getName() {
+ return ProcessSubscriberService.class.getSimpleName();
+ }
- @Override
- public void init() throws FalconException {
+ @Override
+ public void init() throws FalconException {
String falconBrokerImplClass = getPropertyValue(JMSprops.FalconBrokerImplClass);
String falconBrokerUrl = getPropertyValue(JMSprops.FalconBrokerUrl);
String falconEntityTopic = getPropertyValue(JMSprops.FalconEntityTopic);
- subscriber = new FalconTopicSubscriber(falconBrokerImplClass, "", "",
+ subscriber = new FalconTopicSubscriber(falconBrokerImplClass, "", "",
falconBrokerUrl, falconEntityTopic);
- subscriber.startSubscriber();
- }
+ subscriber.startSubscriber();
+ }
- private String getPropertyValue(JMSprops prop) {
- return StartupProperties.get().getProperty(prop.propName,
- prop.defaultPropValue);
- }
+ private String getPropertyValue(JMSprops prop) {
+ return StartupProperties.get().getProperty(prop.propName,
+ prop.defaultPropValue);
+ }
- @Override
- public void destroy() throws FalconException {
- subscriber.closeSubscriber();
- }
+ @Override
+ public void destroy() throws FalconException {
+ subscriber.closeSubscriber();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/prism/src/main/java/org/apache/falcon/service/SLAMonitoringService.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/service/SLAMonitoringService.java b/prism/src/main/java/org/apache/falcon/service/SLAMonitoringService.java
index 71a6c3f..da7887e 100644
--- a/prism/src/main/java/org/apache/falcon/service/SLAMonitoringService.java
+++ b/prism/src/main/java/org/apache/falcon/service/SLAMonitoringService.java
@@ -118,7 +118,7 @@ public class SLAMonitoringService implements FalconService, WorkflowEngineAction
private void removeFromPendingList(Entity entity, String cluster, Date nominalTime) {
ConcurrentMap<Date, Date> pendingInstances = pendingJobs.get(getKey(entity, cluster));
if (pendingInstances != null) {
- LOG.debug("Removing from pending jobs: " + getKey(entity, cluster) + " ---> " +
+ LOG.debug("Removing from pending jobs: " + getKey(entity, cluster) + " ---> " +
SchemaHelper.formatDateUTC(nominalTime));
pendingInstances.remove(nominalTime);
}
@@ -129,7 +129,9 @@ public class SLAMonitoringService implements FalconService, WorkflowEngineAction
@Override
public void run() {
try {
- if (monitoredEntities.isEmpty()) return;
+ if (monitoredEntities.isEmpty()) {
+ return;
+ }
Set<String> keys = new HashSet<String>(monitoredEntities.keySet());
checkSLAMissOnPendingEntities(keys);
addNewPendingEntities(keys);
@@ -142,7 +144,9 @@ public class SLAMonitoringService implements FalconService, WorkflowEngineAction
Date now = new Date();
for (String key : keys) {
ConcurrentMap<Date, Date> pendingInstances = pendingJobs.get(key);
- if (pendingInstances == null) continue;
+ if (pendingInstances == null) {
+ continue;
+ }
ConcurrentMap<Date, Date> interim =
new ConcurrentHashMap<Date, Date>(pendingInstances);
for (Map.Entry<Date, Date> entry : interim.entrySet()) {
@@ -174,7 +178,9 @@ public class SLAMonitoringService implements FalconService, WorkflowEngineAction
Frequency frequency = EntityUtil.getFrequency(entity);
TimeZone timeZone = EntityUtil.getTimeZone(entity);
Date nextStart = EntityUtil.getNextStartTime(startTime, frequency, timeZone, now);
- if (nextStart.after(windowEndTime)) continue;
+ if (nextStart.after(windowEndTime)) {
+ continue;
+ }
ConcurrentMap<Date, Date> pendingInstances = pendingJobs.get(key);
while (!nextStart.after(windowEndTime)) {
if (pendingInstances == null) {
@@ -182,7 +188,9 @@ public class SLAMonitoringService implements FalconService, WorkflowEngineAction
pendingInstances = pendingJobs.get(key);
}
Long latency = monitoredEntities.get(key);
- if (latency == null) break;
+ if (latency == null) {
+ break;
+ }
pendingInstances.putIfAbsent(nextStart, new Date(nextStart.getTime() +
latency * 1500)); //1.5 times latency is when it is supposed to have breached
LOG.debug("Adding to pending jobs: " + key + " ---> " +
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/prism/src/main/java/org/apache/falcon/util/EmbeddedServer.java
----------------------------------------------------------------------
diff --git a/prism/src/main/java/org/apache/falcon/util/EmbeddedServer.java b/prism/src/main/java/org/apache/falcon/util/EmbeddedServer.java
index 34a7f9f..6baeb13 100644
--- a/prism/src/main/java/org/apache/falcon/util/EmbeddedServer.java
+++ b/prism/src/main/java/org/apache/falcon/util/EmbeddedServer.java
@@ -25,24 +25,24 @@ import org.mortbay.jetty.bio.SocketConnector;
import org.mortbay.jetty.webapp.WebAppContext;
public class EmbeddedServer {
- private Server server = new Server();
- private Connector connector = new SocketConnector();
+ private Server server = new Server();
+ private Connector connector = new SocketConnector();
- public EmbeddedServer(int port, String path) {
- connector.setPort(port);
- connector.setHost("0.0.0.0");
- server.addConnector(connector);
+ public EmbeddedServer(int port, String path) {
+ connector.setPort(port);
+ connector.setHost("0.0.0.0");
+ server.addConnector(connector);
- WebAppContext application = new WebAppContext(path, "/");
- server.setHandler(application);
- }
+ WebAppContext application = new WebAppContext(path, "/");
+ server.setHandler(application);
+ }
- public void start() throws Exception {
- Services.get().reset();
- server.start();
- }
+ public void start() throws Exception {
+ Services.get().reset();
+ server.start();
+ }
- public void stop() throws Exception {
- server.stop();
- }
+ public void stop() throws Exception {
+ server.stop();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/prism/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/prism/src/main/resources/log4j.xml b/prism/src/main/resources/log4j.xml
index 299143b..ea08108 100644
--- a/prism/src/main/resources/log4j.xml
+++ b/prism/src/main/resources/log4j.xml
@@ -20,72 +20,72 @@
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
- <appender name="console" class="org.apache.log4j.ConsoleAppender">
- <param name="Target" value="System.out"/>
- <layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%c{1}:%L)%n"/>
- </layout>
- </appender>
+ <appender name="console" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%c{1}:%L)%n"/>
+ </layout>
+ </appender>
- <appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender">
- <param name="File" value="/var/log/falcon/prism.log"/>
- <param name="Append" value="true"/>
- <param name="Threshold" value="debug"/>
- <layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%c{1}:%L)%n"/>
- </layout>
- </appender>
+ <appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender">
+ <param name="File" value="/var/log/falcon/prism.log"/>
+ <param name="Append" value="true"/>
+ <param name="Threshold" value="debug"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%c{1}:%L)%n"/>
+ </layout>
+ </appender>
+
+ <appender name="AUDIT" class="org.apache.log4j.DailyRollingFileAppender">
+ <param name="File" value="/var/log/falcon/prism-audit.log"/>
+ <param name="Append" value="true"/>
+ <param name="Threshold" value="debug"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %x %m%n"/>
+ </layout>
+ </appender>
+
+ <appender name="TRANSACTIONLOG" class="org.apache.log4j.DailyRollingFileAppender">
+ <param name="File" value="/var/log/falcon/prsim-tranlog.log"/>
+ <param name="Append" value="true"/>
+ <param name="Threshold" value="debug"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %x %m%n"/>
+ </layout>
+ </appender>
- <appender name="AUDIT" class="org.apache.log4j.DailyRollingFileAppender">
- <param name="File" value="/var/log/falcon/prism-audit.log"/>
- <param name="Append" value="true"/>
- <param name="Threshold" value="debug"/>
- <layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d %x %m%n"/>
- </layout>
- </appender>
-
- <appender name="TRANSACTIONLOG" class="org.apache.log4j.DailyRollingFileAppender">
- <param name="File" value="/var/log/falcon/prsim-tranlog.log"/>
- <param name="Append" value="true"/>
- <param name="Threshold" value="debug"/>
- <layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d %x %m%n"/>
- </layout>
- </appender>
-
<appender name="METRIC" class="org.apache.log4j.DailyRollingFileAppender">
- <param name="File" value="/var/log/falcon/prism-metric.log"/>
- <param name="Append" value="true"/>
- <param name="Threshold" value="debug"/>
- <layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d %m%n"/>
- </layout>
- </appender>
+ <param name="File" value="/var/log/falcon/prism-metric.log"/>
+ <param name="Append" value="true"/>
+ <param name="Threshold" value="debug"/>
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %m%n"/>
+ </layout>
+ </appender>
+
+ <logger name="org.apache.falcon">
+ <level value="debug"/>
+ <appender-ref ref="FILE"/>
+ </logger>
+
+ <logger name="AUDIT">
+ <level value="info"/>
+ <appender-ref ref="AUDIT"/>
+ </logger>
- <logger name="org.apache.falcon">
- <level value="debug"/>
- <appender-ref ref="FILE" />
- </logger>
+ <logger name="TRANSACTIONLOG">
+ <level value="info"/>
+ <appender-ref ref="TRANSACTIONLOG"/>
+ </logger>
- <logger name="AUDIT">
- <level value="info"/>
- <appender-ref ref="AUDIT" />
- </logger>
-
- <logger name="TRANSACTIONLOG">
- <level value="info"/>
- <appender-ref ref="TRANSACTIONLOG" />
- </logger>
-
<logger name="METRIC">
- <level value="info"/>
- <appender-ref ref="METRIC" />
- </logger>
+ <level value="info"/>
+ <appender-ref ref="METRIC"/>
+ </logger>
- <root>
- <priority value ="info" />
- <appender-ref ref="console" />
- </root>
+ <root>
+ <priority value="info"/>
+ <appender-ref ref="console"/>
+ </root>
</log4j:configuration>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/prism/src/main/webapp/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/prism/src/main/webapp/WEB-INF/web.xml b/prism/src/main/webapp/WEB-INF/web.xml
index 8a9406a..00a6c42 100644
--- a/prism/src/main/webapp/WEB-INF/web.xml
+++ b/prism/src/main/webapp/WEB-INF/web.xml
@@ -25,37 +25,39 @@
<display-name>Apache Falcon Prism</display-name>
<description>Apache Falcon Prism</description>
- <filter>
- <filter-name>auth</filter-name>
- <filter-class>org.apache.falcon.security.BasicAuthFilter</filter-class>
- </filter>
-
- <filter-mapping>
- <filter-name>auth</filter-name>
- <servlet-name>FalconProxyAPI</servlet-name>
- </filter-mapping>
-
- <listener>
- <listener-class>org.apache.falcon.listener.ContextStartupListener</listener-class>
- </listener>
-
- <servlet>
- <servlet-name>FalconProxyAPI</servlet-name>
- <servlet-class>com.sun.jersey.spi.container.servlet.ServletContainer</servlet-class>
- <init-param>
- <param-name>com.sun.jersey.config.property.resourceConfigClass</param-name>
- <param-value>com.sun.jersey.api.core.PackagesResourceConfig</param-value>
- </init-param>
- <init-param>
- <param-name>com.sun.jersey.config.property.packages</param-name>
- <param-value>org.apache.falcon.resource.admin,org.apache.falcon.resource.provider,org.apache.falcon.resource.proxy</param-value>
- </init-param>
- <load-on-startup>1</load-on-startup>
- </servlet>
-
- <servlet-mapping>
- <servlet-name>FalconProxyAPI</servlet-name>
- <url-pattern>/api/*</url-pattern>
- </servlet-mapping>
+ <filter>
+ <filter-name>auth</filter-name>
+ <filter-class>org.apache.falcon.security.BasicAuthFilter</filter-class>
+ </filter>
+
+ <filter-mapping>
+ <filter-name>auth</filter-name>
+ <servlet-name>FalconProxyAPI</servlet-name>
+ </filter-mapping>
+
+ <listener>
+ <listener-class>org.apache.falcon.listener.ContextStartupListener</listener-class>
+ </listener>
+
+ <servlet>
+ <servlet-name>FalconProxyAPI</servlet-name>
+ <servlet-class>com.sun.jersey.spi.container.servlet.ServletContainer</servlet-class>
+ <init-param>
+ <param-name>com.sun.jersey.config.property.resourceConfigClass</param-name>
+ <param-value>com.sun.jersey.api.core.PackagesResourceConfig</param-value>
+ </init-param>
+ <init-param>
+ <param-name>com.sun.jersey.config.property.packages</param-name>
+ <param-value>
+ org.apache.falcon.resource.admin,org.apache.falcon.resource.provider,org.apache.falcon.resource.proxy
+ </param-value>
+ </init-param>
+ <load-on-startup>1</load-on-startup>
+ </servlet>
+
+ <servlet-mapping>
+ <servlet-name>FalconProxyAPI</servlet-name>
+ <url-pattern>/api/*</url-pattern>
+ </servlet-mapping>
</web-app>
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/prism/src/test/java/org/apache/falcon/aspect/GenericAlertTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/aspect/GenericAlertTest.java b/prism/src/test/java/org/apache/falcon/aspect/GenericAlertTest.java
index b2436af..979c349 100644
--- a/prism/src/test/java/org/apache/falcon/aspect/GenericAlertTest.java
+++ b/prism/src/test/java/org/apache/falcon/aspect/GenericAlertTest.java
@@ -22,11 +22,11 @@ import org.testng.annotations.Test;
@Aspect
public class GenericAlertTest {
-
- @Test
- public void testWfInstanceFailedAlert() throws Exception{
- GenericAlert.instrumentFailedInstance("cluster", "process", "agg-coord", "120:df", "ef-id", "1",
+
+ @Test
+ public void testWfInstanceFailedAlert() throws Exception {
+ GenericAlert.instrumentFailedInstance("cluster", "process", "agg-coord", "120:df", "ef-id", "1",
"DELETE", "now", "error", "none", 1242);
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/prism/src/test/java/org/apache/falcon/aspect/LoggingAspectTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/aspect/LoggingAspectTest.java b/prism/src/test/java/org/apache/falcon/aspect/LoggingAspectTest.java
index 2c2a8ed..2e04904 100644
--- a/prism/src/test/java/org/apache/falcon/aspect/LoggingAspectTest.java
+++ b/prism/src/test/java/org/apache/falcon/aspect/LoggingAspectTest.java
@@ -18,67 +18,66 @@
package org.apache.falcon.aspect;
-import java.util.ArrayList;
-import java.util.List;
-
import junit.framework.Assert;
-
import org.apache.falcon.FalconWebException;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.AbstractEntityManager;
import org.testng.annotations.Test;
+import java.util.ArrayList;
+import java.util.List;
+
public class LoggingAspectTest {
-
- private AbstractEntityManager em = new AbstractEntityManager() {
+
+ private AbstractEntityManager em = new AbstractEntityManager() {
};
- private volatile Exception threadException;
+ private volatile Exception threadException;
+
+ @Test(expectedExceptions = FalconWebException.class)
+ public void testBeanLoading() {
- @Test(expectedExceptions=FalconWebException.class)
- public void testBeanLoading() {
+ APIResult result = em.getStatus("type", "entity", "colo");
+ }
- APIResult result = em.getStatus("type", "entity", "colo");
- }
+ @Test
+ public void testExceptionBeanLoading() {
+ try {
+ em.getStatus("type", "entity", "colo");
+ System.out.println();
+
+ } catch (Exception e) {
+ return;
+ }
+ Assert.fail("Exepected excpetion");
+ }
- @Test
- public void testExceptionBeanLoading() {
- try {
- em.getStatus("type", "entity", "colo");
- System.out.println();
-
- } catch (Exception e) {
- return;
- }
- Assert.fail("Exepected excpetion");
- }
-
- @Test(expectedExceptions=FalconWebException.class)
- public void testConcurrentRequests() throws Exception{
+ @Test(expectedExceptions = FalconWebException.class)
+ public void testConcurrentRequests() throws Exception {
List<Thread> threadList = new ArrayList<Thread>();
for (int i = 0; i < 5; i++) {
threadList.add(new Thread() {
public void run() {
try {
- testBeanLoading();
+ testBeanLoading();
} catch (Exception e) {
- e.printStackTrace();
- threadException =e;
+ e.printStackTrace();
+ threadException = e;
throw new RuntimeException(e);
}
}
});
}
-
- for(Thread thread:threadList) {
+
+ for (Thread thread : threadList) {
thread.start();
thread.join();
}
-
- if (threadException != null) {
- throw threadException;
- }
- }
+
+ if (threadException != null) {
+ throw threadException;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-falcon/blob/a4d79f0c/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
----------------------------------------------------------------------
diff --git a/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java b/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
index 5954815..24e5a8a 100644
--- a/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
+++ b/prism/src/test/java/org/apache/falcon/resource/EntityManagerTest.java
@@ -54,7 +54,7 @@ public class EntityManagerTest extends AbstractEntityManager {
@DataProvider(name = "validXMLServletStreamProvider")
private Object[][] servletStreamProvider() {
ServletInputStream validProcessXML = getServletInputStream(SAMPLE_PROCESS_XML);
- return new Object[][] { { EntityType.PROCESS, validProcessXML },
+ return new Object[][]{{EntityType.PROCESS, validProcessXML},
};
}
@@ -97,7 +97,8 @@ public class EntityManagerTest extends AbstractEntityManager {
validate(mockHttpServletRequest,
"InvalidEntityType");
Assert.fail("Invalid entity type was accepted by the system");
- } catch (FalconWebException ignore) {}
+ } catch (FalconWebException ignore) {
+ }
}
/**