You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by tb...@apache.org on 2014/05/02 18:24:53 UTC
[6/7] AMBARI-5616 - Ambari Views: Pig view (Roman Rader via
tbeerbower)
http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/jobs/JobService.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/jobs/JobService.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/jobs/JobService.java
new file mode 100644
index 0000000..23705e9
--- /dev/null
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/jobs/JobService.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.pig.resources.jobs;
+
+import com.google.inject.Inject;
+import org.apache.ambari.view.ViewResourceHandler;
+import org.apache.ambari.view.pig.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.pig.persistence.utils.OnlyOwnersFilteringStrategy;
+import org.apache.ambari.view.pig.resources.files.FileResource;
+import org.apache.ambari.view.pig.resources.jobs.models.PigJob;
+import org.apache.ambari.view.pig.services.BaseService;
+import org.apache.ambari.view.pig.utils.FilePaginator;
+import org.json.simple.JSONObject;
+
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.*;
+import javax.ws.rs.core.*;
+import javax.xml.ws.WebServiceException;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+/**
+ * Servlet for Pig Jobs
+ * API:
+ * GET /:id
+ * read job info
+ * POST /
+ * create new job
+ * Required: scriptId
+ * Optional: params
+ * GET /
+ * get all jobs of current user
+ * GET /:id/notify
+ * callback from Templeton
+ */
+public class JobService extends BaseService {
+ @Inject
+ ViewResourceHandler handler;
+
+ protected JobResourceManager resourceManager = null;
+
+ public synchronized JobResourceManager getResourceManager() {
+ if (resourceManager == null) {
+ resourceManager = new JobResourceManager(context);
+ }
+ return resourceManager;
+ }
+
+ public synchronized void setResourceManager(JobResourceManager resourceManager) {
+ this.resourceManager = resourceManager;
+ }
+
+ /**
+ * Get single item
+ */
+ @GET
+ @Path("{jobId}")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getJob(@PathParam("jobId") String jobId) {
+ PigJob job = null;
+ try {
+ job = getResourceManager().read(jobId);
+ } catch (ItemNotFound itemNotFound) {
+ return Response.status(404).build();
+ }
+ getResourceManager().retrieveJobStatus(job);
+ JSONObject object = new JSONObject();
+ object.put("job", job);
+ return Response.ok(object).build();
+ }
+
+ /**
+ * Get single item
+ */
+ @DELETE
+ @Path("{jobId}")
+ public Response killJob(@PathParam("jobId") String jobId) throws IOException {
+ PigJob job = null;
+ try {
+ job = getResourceManager().read(jobId);
+ } catch (ItemNotFound itemNotFound) {
+ return Response.status(404).build();
+ }
+ getResourceManager().killJob(job);
+ return Response.status(204).build();
+ }
+
+ /**
+ * Callback from templeton
+ */
+ @GET
+ @Path("{jobId}/notify")
+ public Response jobCompletionNotification(@Context HttpHeaders headers,
+ @Context UriInfo ui,
+ @PathParam("jobId") final String jobId) {
+ PigJob job = null;
+ try {
+ job = getResourceManager().ignorePermissions(new Callable<PigJob>() {
+ public PigJob call() throws Exception {
+ PigJob job = null;
+ try {
+ job = getResourceManager().read(jobId);
+ } catch (ItemNotFound itemNotFound) {
+ return null;
+ }
+ return job;
+ }
+ });
+ } catch (Exception e) {
+ return Response.status(500).build();
+ }
+ if (job == null)
+ return Response.status(404).build();
+
+ getResourceManager().retrieveJobStatus(job);
+ return Response.ok().build();
+ }
+
+ @GET
+ @Path("{jobId}/results/{fileName}")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response jobExitCode(@Context HttpHeaders headers,
+ @Context UriInfo ui,
+ @PathParam("jobId") String jobId,
+ @PathParam("fileName") String fileName,
+ @QueryParam("page") Long page) {
+ PigJob job = null;
+ try {
+ job = getResourceManager().read(jobId);
+ } catch (ItemNotFound itemNotFound) {
+ return Response.ok("No such job").status(404).build();
+ }
+ try {
+ String filePath = job.getStatusDir() + "/" + fileName;
+ LOG.debug("Reading file " + filePath);
+ FilePaginator paginator = new FilePaginator(filePath, context);
+
+ if (page == null)
+ page = 0L;
+
+ FileResource file = new FileResource();
+ file.filePath = filePath;
+ file.fileContent = paginator.readPage(page);
+ file.hasNext = paginator.pageCount() > page + 1;
+ file.page = page;
+ file.pageCount = paginator.pageCount();
+
+ JSONObject object = new JSONObject();
+ object.put("file", file);
+ return Response.ok(object).status(200).build();
+ } catch (IOException e) {
+ return Response.ok(e.getMessage()).status(404).build();
+ } catch (InterruptedException e) {
+ return Response.ok(e.getMessage()).status(404).build();
+ }
+ }
+
+ /**
+ * Get all jobs
+ */
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getJobList(@Context HttpHeaders headers, @Context UriInfo ui) {
+ List allJobs = getResourceManager().readAll(
+ new OnlyOwnersFilteringStrategy(this.context.getUsername()));
+
+ JSONObject object = new JSONObject();
+ object.put("jobs", allJobs);
+ return Response.ok(object).build();
+ }
+
+ /**
+ * Create job
+ */
+ @POST
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response runJob(PigJobRequest request, @Context HttpServletResponse response,
+ @Context UriInfo ui) {
+ if (!request.validatePOST()) {
+ return badRequestResponse(request.explainPOST());
+ }
+ try {
+ getResourceManager().create(request.job);
+ } catch (IllegalArgumentException e) {
+ return badRequestResponse(e.getMessage());
+ } catch (WebServiceException e) {
+ return serverErrorResponse(e.getMessage());
+ }
+
+ PigJob job = null;
+
+ try {
+ job = getResourceManager().read(request.job.getId());
+ } catch (ItemNotFound itemNotFound) {
+ return Response.status(404).build();
+ }
+
+ response.setHeader("Location",
+ String.format("%s/%s", ui.getAbsolutePath().toString(), request.job.getId()));
+
+ JSONObject object = new JSONObject();
+ object.put("job", job);
+ return Response.ok(object).status(201).build();
+ }
+
+ public static class PigJobRequest {
+ public PigJob job;
+
+ public String explainPOST() {
+ StringBuilder result = new StringBuilder();
+ if ((job.getPigScript() == null || job.getPigScript().isEmpty()) &&
+ (job.getForcedContent() == null || job.getForcedContent().isEmpty()))
+ result.append("No pigScript file or forcedContent specifed;");
+ if (job.getTitle() == null || job.getTitle().isEmpty())
+ result.append("No title specifed;");
+ if (job.getId() != null && !job.getTitle().isEmpty())
+ result.append("ID should not exists in creation request;");
+ return result.toString();
+ }
+
+ public boolean validatePOST() {
+ return explainPOST().isEmpty();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/jobs/models/PigJob.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/jobs/models/PigJob.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/jobs/models/PigJob.java
new file mode 100644
index 0000000..e49c267
--- /dev/null
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/jobs/models/PigJob.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.pig.resources.jobs.models;
+
+import org.apache.ambari.view.pig.persistence.utils.PersonalResource;
+import org.apache.commons.beanutils.BeanUtils;
+import org.codehaus.jackson.annotate.JsonIgnore;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Map;
+
+/**
+ * Bean to represent Pig job
+ *
+ * Job lifecycle:
+ * SUBMITTING
+ * |
+ * [POST to Templeton]
+ * | |
+ * SUBMITTED SUBMIT_FAILED
+ * |
+ * |
+ * [GET result from job/:job_id]
+ * | | |
+ * COMPLETED KILLED FAILED
+ */
+public class PigJob implements Serializable, PersonalResource {
+
+ public enum Status {
+ UNKNOWN,
+ SUBMITTING, SUBMITTED, RUNNING, // in progress
+ SUBMIT_FAILED, COMPLETED, FAILED, KILLED // finished
+ }
+
+ public boolean isInProgress() {
+ return status == Status.SUBMITTED || status == Status.SUBMITTING ||
+ status == Status.RUNNING;
+ }
+
+ public static final int RUN_STATE_RUNNING = 1;
+ public static final int RUN_STATE_SUCCEEDED = 2;
+ public static final int RUN_STATE_FAILED = 3;
+ public static final int RUN_STATE_PREP = 4;
+ public static final int RUN_STATE_KILLED = 5;
+
+ public PigJob() {
+ }
+
+ public PigJob(Map<String, Object> stringObjectMap) throws InvocationTargetException, IllegalAccessException {
+ BeanUtils.populate(this, stringObjectMap);
+ }
+
+ String id = null;
+ String scriptId = null;
+
+ // cloned script data
+ String pigScript = null;
+ String pythonScript = null;
+ String title = null;
+ String templetonArguments = null;
+ String owner;
+
+ // job info
+ String forcedContent = null;
+
+ /**
+ * jobType possible values:
+ * null - regular execute
+ * "explain"
+ * "syntax_check"
+ */
+ String jobType = null;
+
+ /**
+ * Additional file to use in Explain job
+ */
+ String sourceFile = null;
+ String sourceFileContent = null;
+
+ String statusDir;
+ Long dateStarted = 0L;
+ String jobId = null;
+
+ // status fields (not reliable)
+ Status status = Status.UNKNOWN;
+ Integer percentComplete = null;
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof PigJob)) return false;
+
+ PigJob pigScript = (PigJob) o;
+
+ if (!id.equals(pigScript.id)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return id.hashCode();
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getOwner() {
+ return owner;
+ }
+
+ @Override
+ public void setOwner(String owner) {
+ this.owner = owner;
+ }
+
+ public Status getStatus() {
+ return status;
+ }
+
+ public void setStatus(Status status) {
+ this.status = status;
+ }
+
+ public String getScriptId() {
+ return scriptId;
+ }
+
+ public void setScriptId(String scriptId) {
+ this.scriptId = scriptId;
+ }
+
+ public String getTempletonArguments() {
+ return templetonArguments;
+ }
+
+ public void setTempletonArguments(String templetonArguments) {
+ this.templetonArguments = templetonArguments;
+ }
+
+ public String getPigScript() {
+ return pigScript;
+ }
+
+ public void setPigScript(String pigScript) {
+ this.pigScript = pigScript;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public void setJobId(String jobId) {
+ this.jobId = jobId;
+ }
+
+ public String getTitle() {
+ return title;
+ }
+
+ public void setTitle(String title) {
+ this.title = title;
+ }
+
+ public void setStatusDir(String statusDir) {
+ this.statusDir = statusDir;
+ }
+
+ public String getStatusDir() {
+ return statusDir;
+ }
+
+ public Long getDateStarted() {
+ return dateStarted;
+ }
+
+ public void setDateStarted(Long dateStarted) {
+ this.dateStarted = dateStarted;
+ }
+
+ public Integer getPercentComplete() {
+ return percentComplete;
+ }
+
+ public void setPercentComplete(Integer percentComplete) {
+ this.percentComplete = percentComplete;
+ }
+
+ public String getPythonScript() {
+ return pythonScript;
+ }
+
+ public void setPythonScript(String pythonScript) {
+ this.pythonScript = pythonScript;
+ }
+
+ public String getForcedContent() {
+ return forcedContent;
+ }
+
+ public void setForcedContent(String forcedContent) {
+ this.forcedContent = forcedContent;
+ }
+
+ public String getJobType() {
+ return jobType;
+ }
+
+ public void setJobType(String jobType) {
+ this.jobType = jobType;
+ }
+
+ public String getSourceFileContent() {
+ return sourceFileContent;
+ }
+
+ public void setSourceFileContent(String sourceFileContent) {
+ this.sourceFileContent = sourceFileContent;
+ }
+
+ public String getSourceFile() {
+ return sourceFile;
+ }
+
+ public void setSourceFile(String sourceFile) {
+ this.sourceFile = sourceFile;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/jobs/utils/JobPolling.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/jobs/utils/JobPolling.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/jobs/utils/JobPolling.java
new file mode 100644
index 0000000..31eabae
--- /dev/null
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/jobs/utils/JobPolling.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.pig.resources.jobs.utils;
+
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.pig.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.pig.resources.jobs.JobResourceManager;
+import org.apache.ambari.view.pig.resources.jobs.models.PigJob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Observable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Polling manager
+ * Makes scheduled repeated polling of templeton to
+ * be aware of happen events like job finished,
+ * killed, changed progress and so on.
+ */
+public class JobPolling implements Runnable {
+ private final static Logger LOG =
+ LoggerFactory.getLogger(JobPolling.class);
+
+ /**
+ * We should limit count of concurrent calls to templeton
+ * to avoid high load on component
+ */
+ private static final int WORKER_COUNT = 2;
+
+ private static final int POLLING_DELAY = 10*60; // 10 minutes
+
+ /**
+ * In LONG_JOB_THRESHOLD seconds job reschedules polling from POLLING_DELAY to LONG_POLLING_DELAY
+ */
+ private static final int LONG_POLLING_DELAY = 60*60; // 1 hour
+ private static final int LONG_JOB_THRESHOLD = 60*60; // 1 hour
+
+ private static final ScheduledExecutorService pollWorkersPool = Executors.newScheduledThreadPool(WORKER_COUNT);
+
+ private static final Map<String, JobPolling> jobPollers = new HashMap<String, JobPolling>();
+
+ private JobResourceManager resourceManager = null;
+ private final ViewContext context;
+ private PigJob job;
+ private volatile ScheduledFuture<?> thisFuture;
+
+ private JobPolling(ViewContext context, PigJob job) {
+ this.context = context;
+ this.job = job;
+ }
+
+ protected synchronized JobResourceManager getResourceManager() {
+ if (resourceManager == null) {
+ resourceManager = new JobResourceManager(context);
+ }
+ return resourceManager;
+ }
+
+ public void run() {
+ LOG.debug("Polling job status " + job.getJobId() + " #" + job.getId());
+ try {
+ job = getResourceManager().read(job.getId());
+ } catch (ItemNotFound itemNotFound) {
+ LOG.error("Job " + job.getJobId() + " does not exist! Polling canceled");
+ thisFuture.cancel(false);
+ return;
+ }
+ getResourceManager().retrieveJobStatus(job);
+
+ Long time = System.currentTimeMillis() / 1000L;
+ if (time - job.getDateStarted() > LONG_JOB_THRESHOLD) {
+ LOG.debug("Job becomes long.. Rescheduling polling to longer period");
+ // If job running longer than LONG_JOB_THRESHOLD, reschedule
+ // it to poll every LONG_POLLING_DELAY instead of POLLING_DELAY
+ thisFuture.cancel(false);
+ scheduleJobPolling(true);
+ }
+
+ switch (job.getStatus()) {
+ case SUBMIT_FAILED:
+ case COMPLETED:
+ case FAILED:
+ case KILLED:
+ LOG.debug("Job finished. Polling canceled");
+ thisFuture.cancel(false);
+ break;
+ default:
+ }
+ }
+
+ private void scheduleJobPolling(boolean longDelay) {
+ if (!longDelay) {
+ thisFuture = pollWorkersPool.scheduleWithFixedDelay(this,
+ POLLING_DELAY, POLLING_DELAY, TimeUnit.SECONDS);
+ } else {
+ thisFuture = pollWorkersPool.scheduleWithFixedDelay(this,
+ LONG_POLLING_DELAY, LONG_POLLING_DELAY, TimeUnit.SECONDS);
+ }
+ }
+
+ private void scheduleJobPolling() {
+ scheduleJobPolling(false);
+ }
+
+ /**
+ * Schedule job polling
+ * @param context ViewContext of web app
+ * @param job job instance
+ * @return returns false if already scheduled
+ */
+ public static boolean pollJob(ViewContext context, PigJob job) {
+ if (jobPollers.get(job.getJobId()) == null) {
+ LOG.debug("Setting up polling for " + job.getJobId());
+ JobPolling polling = new JobPolling(context, job);
+ polling.scheduleJobPolling();
+ jobPollers.put(job.getJobId(), polling);
+ return true;
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/ScriptResourceManager.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/ScriptResourceManager.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/ScriptResourceManager.java
new file mode 100644
index 0000000..9714d27
--- /dev/null
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/ScriptResourceManager.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.pig.resources.scripts;
+
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.pig.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.pig.resources.PersonalCRUDResourceManager;
+import org.apache.ambari.view.pig.resources.scripts.models.PigScript;
+import org.apache.ambari.view.pig.services.BaseService;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.xml.ws.WebServiceException;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+public class ScriptResourceManager extends PersonalCRUDResourceManager<PigScript> {
+ private final static Logger LOG =
+ LoggerFactory.getLogger(ScriptResourceManager.class);
+
+ public ScriptResourceManager(ViewContext context) {
+ super(PigScript.class, context);
+ }
+
+ @Override
+ public PigScript create(PigScript object) {
+ super.create(object);
+ if (object.getPigScript() == null || object.getPigScript().isEmpty()) {
+ createDefaultScriptFile(object);
+ }
+ return object;
+ }
+
+ private void createDefaultScriptFile(PigScript object) {
+ String userScriptsPath = context.getProperties().get("dataworker.userScriptsPath");
+ if (userScriptsPath == null) {
+ String msg = "dataworker.userScriptsPath is not configured!";
+ LOG.error(msg);
+ throw new WebServiceException(msg);
+ }
+ int checkId = 0;
+
+ boolean fileCreated;
+ String newFilePath;
+ do {
+ String normalizedName = object.getTitle().replaceAll("[^a-zA-Z0-9 ]+", "").replaceAll(" ", "_").toLowerCase();
+ String timestamp = new SimpleDateFormat("yyyy-MM-dd_hh-mm").format(new Date());
+ newFilePath = String.format(userScriptsPath +
+ "/%s/%s-%s%s.pig", context.getUsername(),
+ normalizedName, timestamp, (checkId == 0)?"":"_"+checkId);
+ LOG.debug("Trying to create new file " + newFilePath);
+
+ try {
+ FSDataOutputStream stream = BaseService.getHdfsApi(context).create(newFilePath, false);
+ stream.close();
+ fileCreated = true;
+ LOG.debug("File created successfully!");
+ } catch (FileAlreadyExistsException e) {
+ fileCreated = false;
+ LOG.debug("File already exists. Trying next id");
+ } catch (IOException e) {
+ try {
+ delete(object.getId());
+ } catch (ItemNotFound itemNotFound) {
+ throw new WebServiceException("Error in creation, during clean up: " + itemNotFound.toString(), itemNotFound);
+ }
+ throw new WebServiceException("Error in creation: " + e.toString(), e);
+ } catch (InterruptedException e) {
+ try {
+ delete(object.getId());
+ } catch (ItemNotFound itemNotFound) {
+ throw new WebServiceException("Error in creation, during clean up: " + itemNotFound.toString(), itemNotFound);
+ }
+ throw new WebServiceException("Error in creation: " + e.toString(), e);
+ }
+ checkId += 1;
+ } while (!fileCreated);
+
+ object.setPigScript(newFilePath);
+ getPigStorage().store(object);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/ScriptResourceProvider.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/ScriptResourceProvider.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/ScriptResourceProvider.java
new file mode 100644
index 0000000..478a460
--- /dev/null
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/ScriptResourceProvider.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.pig.resources.scripts;
+
+import com.google.inject.Inject;
+import org.apache.ambari.view.*;
+import org.apache.ambari.view.pig.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.pig.persistence.utils.OnlyOwnersFilteringStrategy;
+import org.apache.ambari.view.pig.resources.PersonalCRUDResourceManager;
+import org.apache.ambari.view.pig.resources.scripts.models.PigScript;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.*;
+
+public class ScriptResourceProvider implements ResourceProvider<PigScript> {
+ @Inject
+ ViewContext context;
+
+ protected ScriptResourceManager resourceManager = null;
+ protected final static Logger LOG =
+ LoggerFactory.getLogger(ScriptResourceProvider.class);
+
+ protected synchronized PersonalCRUDResourceManager<PigScript> getResourceManager() {
+ if (resourceManager == null) {
+ resourceManager = new ScriptResourceManager(context);
+ }
+ return resourceManager;
+ }
+
+ @Override
+ public PigScript getResource(String resourceId, Set<String> properties) throws SystemException, NoSuchResourceException, UnsupportedPropertyException {
+ try {
+ return getResourceManager().read(resourceId);
+ } catch (ItemNotFound itemNotFound) {
+ throw new NoSuchResourceException(resourceId);
+ }
+ }
+
+ @Override
+ public Set<PigScript> getResources(ReadRequest readRequest) throws SystemException, NoSuchResourceException, UnsupportedPropertyException {
+ return new HashSet<PigScript>(getResourceManager().readAll(
+ new OnlyOwnersFilteringStrategy(this.context.getUsername())));
+ }
+
+ @Override
+ public void createResource(String s, Map<String, Object> stringObjectMap) throws SystemException, ResourceAlreadyExistsException, NoSuchResourceException, UnsupportedPropertyException {
+ PigScript script = null;
+ try {
+ script = new PigScript(stringObjectMap);
+ } catch (InvocationTargetException e) {
+ throw new SystemException("error on creating resource", e);
+ } catch (IllegalAccessException e) {
+ throw new SystemException("error on creating resource", e);
+ }
+ getResourceManager().create(script);
+ }
+
+ @Override
+ public boolean updateResource(String resourceId, Map<String, Object> stringObjectMap) throws SystemException, NoSuchResourceException, UnsupportedPropertyException {
+ PigScript script = null;
+ try {
+ script = new PigScript(stringObjectMap);
+ } catch (InvocationTargetException e) {
+ throw new SystemException("error on updating resource", e);
+ } catch (IllegalAccessException e) {
+ throw new SystemException("error on updating resource", e);
+ }
+ try {
+ getResourceManager().update(script, resourceId);
+ } catch (ItemNotFound itemNotFound) {
+ throw new NoSuchResourceException(resourceId);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean deleteResource(String resourceId) throws SystemException, NoSuchResourceException, UnsupportedPropertyException {
+ try {
+ getResourceManager().delete(resourceId);
+ } catch (ItemNotFound itemNotFound) {
+ throw new NoSuchResourceException(resourceId);
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/ScriptService.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/ScriptService.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/ScriptService.java
new file mode 100644
index 0000000..c07f985
--- /dev/null
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/ScriptService.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.pig.resources.scripts;
+
+import com.google.inject.Inject;
+import org.apache.ambari.view.ViewResourceHandler;
+import org.apache.ambari.view.pig.persistence.utils.OnlyOwnersFilteringStrategy;
+import org.apache.ambari.view.pig.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.pig.resources.PersonalCRUDResourceManager;
+import org.apache.ambari.view.pig.resources.scripts.models.PigScript;
+import org.apache.ambari.view.pig.services.BaseService;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.*;
+import javax.ws.rs.core.*;
+import java.util.List;
+
+/**
+ * Servlet for scripts
+ * API:
+ * GET /:id
+ * read script
+ * POST /
+ * create new script
+ * Required: title, pigScript
+ * GET /
+ * get all scripts of current user
+ */
+public class ScriptService extends BaseService {
+ @Inject
+ ViewResourceHandler handler;
+
+ protected ScriptResourceManager resourceManager = null;
+ protected final static Logger LOG =
+ LoggerFactory.getLogger(ScriptService.class);
+
+ protected synchronized PersonalCRUDResourceManager<PigScript> getResourceManager() {
+ if (resourceManager == null) {
+ resourceManager = new ScriptResourceManager(context);
+ }
+ return resourceManager;
+ }
+
+ /**
+ * Get single item
+ */
+ @GET
+ @Path("{scriptId}")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getScript(@PathParam("scriptId") String scriptId) {
+ PigScript script = null;
+ try {
+ script = getResourceManager().read(scriptId);
+ } catch (ItemNotFound itemNotFound) {
+ return Response.status(404).build();
+ }
+ JSONObject object = new JSONObject();
+ object.put("script", script);
+ return Response.ok(object).build();
+ }
+
+ /**
+ * Delete single item
+ */
+ @DELETE
+ @Path("{scriptId}")
+ public Response deleteScript(@PathParam("scriptId") String scriptId) {
+ try {
+ getResourceManager().delete(scriptId);
+ } catch (ItemNotFound itemNotFound) {
+ return Response.status(404).build();
+ }
+ return Response.status(204).build();
+ }
+
+ /**
+ * Get all scripts
+ */
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getScriptList() {
+ LOG.debug("Getting all scripts");
+ List allScripts = getResourceManager().readAll(
+ new OnlyOwnersFilteringStrategy(this.context.getUsername()));
+
+ JSONObject object = new JSONObject();
+ object.put("scripts", allScripts);
+ return Response.ok(object).build();
+ }
+
+ /**
+ * Update item
+ */
+ @PUT
+ @Path("{scriptId}")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response updateScript(PigScriptRequest request,
+ @PathParam("scriptId") String scriptId) {
+ try {
+ getResourceManager().update(request.script, scriptId);
+ } catch (ItemNotFound itemNotFound) {
+ return Response.status(404).build();
+ }
+ return Response.status(204).build();
+ }
+
+ /**
+ * Create script
+ */
+ @POST
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response saveScript(PigScriptRequest request, @Context HttpServletResponse response,
+ @Context UriInfo ui) {
+ getResourceManager().create(request.script);
+
+ PigScript script = null;
+
+ try {
+ script = getResourceManager().read(request.script.getId());
+ } catch (ItemNotFound itemNotFound) {
+ return Response.status(404).build();
+ }
+
+ response.setHeader("Location",
+ String.format("%s/%s", ui.getAbsolutePath().toString(), request.script.getId()));
+
+ JSONObject object = new JSONObject();
+ object.put("script", script);
+ return Response.ok(object).status(201).build();
+ }
+
+ public static class PigScriptRequest {
+ public PigScript script;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/models/PigScript.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/models/PigScript.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/models/PigScript.java
new file mode 100644
index 0000000..1c69adb
--- /dev/null
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/scripts/models/PigScript.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.pig.resources.scripts.models;
+
+import org.apache.ambari.view.pig.persistence.utils.PersonalResource;
+import org.apache.commons.beanutils.BeanUtils;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Date;
+import java.util.Map;
+
+/**
+ * Bean to represent script
+ */
+public class PigScript implements Serializable, PersonalResource {
+ String id;
+
+ String title = "";
+ String pigScript = "";
+ String pythonScript = "";
+ String templetonArguments = "";
+ Date dateCreated;
+ String owner = "";
+
+ boolean opened = false;
+
+ public PigScript() {
+ }
+
+ public PigScript(Map<String, Object> stringObjectMap) throws InvocationTargetException, IllegalAccessException {
+ BeanUtils.populate(this, stringObjectMap);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof PigScript)) return false;
+
+ PigScript pigScript = (PigScript) o;
+
+ if (!id.equals(pigScript.id)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return id.hashCode();
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getTitle() {
+ return title;
+ }
+
+ public void setTitle(String title) {
+ this.title = title;
+ }
+
+ public String getPigScript() {
+ return pigScript;
+ }
+
+ public void setPigScript(String pigScript) {
+ this.pigScript = pigScript;
+ }
+
+ public String getTempletonArguments() {
+ return templetonArguments;
+ }
+
+ public void setTempletonArguments(String templetonArguments) {
+ this.templetonArguments = templetonArguments;
+ }
+
+ public Date getDateCreated() {
+ return dateCreated;
+ }
+
+ public void setDateCreated(Date dateCreated) {
+ this.dateCreated = dateCreated;
+ }
+
+ public boolean isOpened() {
+ return opened;
+ }
+
+ public void setOpened(boolean opened) {
+ this.opened = opened;
+ }
+
+ public String getOwner() {
+ return owner;
+ }
+
+ public void setOwner(String owner) {
+ this.owner = owner;
+ }
+
+ public String getPythonScript() {
+ return pythonScript;
+ }
+
+ public void setPythonScript(String pythonScript) {
+ this.pythonScript = pythonScript;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/UDFResourceManager.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/UDFResourceManager.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/UDFResourceManager.java
new file mode 100644
index 0000000..62e389e
--- /dev/null
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/UDFResourceManager.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.pig.resources.udf;
+
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.pig.resources.PersonalCRUDResourceManager;
+import org.apache.ambari.view.pig.resources.udf.models.UDF;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UDFResourceManager extends PersonalCRUDResourceManager<UDF> {
+ private final static Logger LOG =
+ LoggerFactory.getLogger(UDFResourceManager.class);
+
+ public UDFResourceManager(ViewContext context) {
+ super(UDF.class, context);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/UDFResourceProvider.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/UDFResourceProvider.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/UDFResourceProvider.java
new file mode 100644
index 0000000..3069ddd
--- /dev/null
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/UDFResourceProvider.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.pig.resources.udf;
+
+import com.google.inject.Inject;
+import org.apache.ambari.view.*;
+import org.apache.ambari.view.pig.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.pig.persistence.utils.OnlyOwnersFilteringStrategy;
+import org.apache.ambari.view.pig.resources.PersonalCRUDResourceManager;
+import org.apache.ambari.view.pig.resources.udf.models.UDF;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class UDFResourceProvider implements ResourceProvider<UDF> {
+ @Inject
+ ViewContext context;
+
+ protected UDFResourceManager resourceManager = null;
+ protected final static Logger LOG =
+ LoggerFactory.getLogger(UDFResourceProvider.class);
+
+ protected synchronized PersonalCRUDResourceManager<UDF> getResourceManager() {
+ if (resourceManager == null) {
+ resourceManager = new UDFResourceManager(context);
+ }
+ return resourceManager;
+ }
+
+ @Override
+ public UDF getResource(String resourceId, Set<String> properties) throws SystemException, NoSuchResourceException, UnsupportedPropertyException {
+ try {
+ return getResourceManager().read(resourceId);
+ } catch (ItemNotFound itemNotFound) {
+ throw new NoSuchResourceException(resourceId);
+ }
+ }
+
+ @Override
+ public Set<UDF> getResources(ReadRequest readRequest) throws SystemException, NoSuchResourceException, UnsupportedPropertyException {
+ return new HashSet<UDF>(getResourceManager().readAll(
+ new OnlyOwnersFilteringStrategy(this.context.getUsername())));
+ }
+
+ @Override
+ public void createResource(String s, Map<String, Object> stringObjectMap) throws SystemException, ResourceAlreadyExistsException, NoSuchResourceException, UnsupportedPropertyException {
+ UDF udf = null;
+ try {
+ udf = new UDF(stringObjectMap);
+ } catch (InvocationTargetException e) {
+ throw new SystemException("error on creating resource", e);
+ } catch (IllegalAccessException e) {
+ throw new SystemException("error on creating resource", e);
+ }
+ getResourceManager().create(udf);
+ }
+
+ @Override
+ public boolean updateResource(String resourceId, Map<String, Object> stringObjectMap) throws SystemException, NoSuchResourceException, UnsupportedPropertyException {
+ UDF udf = null;
+ try {
+ udf = new UDF(stringObjectMap);
+ } catch (InvocationTargetException e) {
+ throw new SystemException("error on updating resource", e);
+ } catch (IllegalAccessException e) {
+ throw new SystemException("error on updating resource", e);
+ }
+ try {
+ getResourceManager().update(udf, resourceId);
+ } catch (ItemNotFound itemNotFound) {
+ throw new NoSuchResourceException(resourceId);
+ }
+ return true;
+ }
+
+ @Override
+ public boolean deleteResource(String resourceId) throws SystemException, NoSuchResourceException, UnsupportedPropertyException {
+ try {
+ getResourceManager().delete(resourceId);
+ } catch (ItemNotFound itemNotFound) {
+ throw new NoSuchResourceException(resourceId);
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/UDFService.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/UDFService.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/UDFService.java
new file mode 100644
index 0000000..d8b24bc
--- /dev/null
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/UDFService.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.pig.resources.udf;
+
+import com.google.inject.Inject;
+import org.apache.ambari.view.ViewResourceHandler;
+import org.apache.ambari.view.pig.persistence.utils.ItemNotFound;
+import org.apache.ambari.view.pig.persistence.utils.OnlyOwnersFilteringStrategy;
+import org.apache.ambari.view.pig.resources.PersonalCRUDResourceManager;
+import org.apache.ambari.view.pig.resources.udf.models.UDF;
+import org.apache.ambari.view.pig.services.BaseService;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.*;
+import javax.ws.rs.core.*;
+import java.util.List;
+
+/**
+ * Servlet for UDFs
+ * API:
+ * GET /
+ * get all UDFs
+ * GET /:id
+ * get one UDF
+ * PUT /:id
+ * update UDF
+ * POST /
+ * create new UDF
+ * Required: path, name
+ */
+public class UDFService extends BaseService {
+ @Inject
+ ViewResourceHandler handler;
+
+ protected UDFResourceManager resourceManager = null;
+ protected final static Logger LOG =
+ LoggerFactory.getLogger(UDFService.class);
+
+ protected synchronized PersonalCRUDResourceManager<UDF> getResourceManager() {
+ if (resourceManager == null) {
+ resourceManager = new UDFResourceManager(context);
+ }
+ return resourceManager;
+ }
+
+ /**
+ * Get single item
+ */
+ @GET
+ @Path("{udfId}")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getUDF(@PathParam("udfId") String udfId) {
+ UDF udf = null;
+ try {
+ udf = getResourceManager().read(udfId);
+ } catch (ItemNotFound itemNotFound) {
+ return Response.status(404).build();
+ }
+ JSONObject object = new JSONObject();
+ object.put("udf", udf);
+ return Response.ok(object).build();
+ }
+
+ /**
+ * Delete single item
+ */
+ @DELETE
+ @Path("{udfId}")
+ public Response deleteUDF(@PathParam("udfId") String udfId) {
+ try {
+ getResourceManager().delete(udfId);
+ } catch (ItemNotFound itemNotFound) {
+ return Response.status(404).build();
+ }
+ return Response.status(204).build();
+ }
+
+ /**
+ * Get all UDFs
+ */
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getUDFList(@Context UriInfo ui) {
+ LOG.debug("Getting all UDFs");
+ List allUDFs = getResourceManager().readAll(
+ new OnlyOwnersFilteringStrategy(this.context.getUsername()));
+
+ JSONObject object = new JSONObject();
+ object.put("udfs", allUDFs);
+ return Response.ok(object).build();
+ }
+
+ /**
+ * Update item
+ */
+ @PUT
+ @Path("{udfId}")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response updateUDF(UDFRequest request,
+ @PathParam("udfId") String udfId) {
+ try {
+ getResourceManager().update(request.udf, udfId);
+ } catch (ItemNotFound itemNotFound) {
+ return Response.status(404).build();
+ }
+ return Response.status(204).build();
+ }
+
+ /**
+ * Create UDF
+ */
+ @POST
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response createUDF(UDFRequest request, @Context HttpServletResponse response,
+ @Context UriInfo ui) {
+ getResourceManager().create(request.udf);
+
+ UDF udf = null;
+
+ try {
+ udf = getResourceManager().read(request.udf.getId());
+ } catch (ItemNotFound itemNotFound) {
+ return Response.status(404).build();
+ }
+
+ response.setHeader("Location",
+ String.format("%s/%s", ui.getAbsolutePath().toString(), request.udf.getId()));
+
+ JSONObject object = new JSONObject();
+ object.put("udf", udf);
+ return Response.ok(object).status(201).build();
+ }
+
+ public static class UDFRequest {
+ public UDF udf;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/models/UDF.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/models/UDF.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/models/UDF.java
new file mode 100644
index 0000000..0a18329
--- /dev/null
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/resources/udf/models/UDF.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.pig.resources.udf.models;
+
+import org.apache.ambari.view.pig.persistence.utils.PersonalResource;
+import org.apache.commons.beanutils.BeanUtils;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Map;
+
+/**
+ * Bean to represent User Defined Functions
+ */
+public class UDF implements Serializable, PersonalResource {
+ String id;
+ String path;
+ String name;
+ String owner;
+
+ public UDF() {
+ }
+
+ public UDF(Map<String, Object> stringObjectMap) throws InvocationTargetException, IllegalAccessException {
+ BeanUtils.populate(this, stringObjectMap);
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getOwner() {
+ return owner;
+ }
+
+ @Override
+ public void setOwner(String owner) {
+ this.owner = owner;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/services/BaseService.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/services/BaseService.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/services/BaseService.java
new file mode 100644
index 0000000..b37c518
--- /dev/null
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/services/BaseService.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.pig.services;
+
+import com.google.inject.Inject;
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.ViewResourceHandler;
+import org.apache.ambari.view.pig.persistence.Storage;
+import org.apache.ambari.view.pig.utils.HdfsApi;
+import org.apache.ambari.view.pig.persistence.utils.StorageUtil;
+import org.json.simple.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.xml.ws.WebServiceException;
+import java.io.IOException;
+import java.util.HashMap;
+
+
+public class BaseService {
+ @Inject
+ protected ViewContext context;
+
+ protected final static Logger LOG =
+ LoggerFactory.getLogger(BaseService.class);
+
+ private Storage storage;
+
+ public Storage getStorage() {
+ if (this.storage == null) {
+ storage = StorageUtil.getStorage(context);
+ }
+ return storage;
+ }
+
+ public void setStorage(Storage storage) {
+ this.storage = storage;
+ }
+
+ private static HdfsApi hdfsApi = null;
+
+ public static HdfsApi getHdfsApi(ViewContext context) {
+ if (hdfsApi == null) {
+ Thread.currentThread().setContextClassLoader(null);
+
+ String userName = context.getUsername();
+
+ String defaultFS = context.getProperties().get("dataworker.defaultFs");
+ if (defaultFS == null) {
+ String message = "dataworker.defaultFs is not configured!";
+ LOG.error(message);
+ throw new WebServiceException(message);
+ }
+
+ try {
+ hdfsApi = new HdfsApi(defaultFS, userName);
+ LOG.info("HdfsApi connected OK");
+ } catch (IOException e) {
+ String message = "HdfsApi IO error: " + e.getMessage();
+ LOG.error(message);
+ throw new WebServiceException(message, e);
+ } catch (InterruptedException e) {
+ String message = "HdfsApi Interrupted error: " + e.getMessage();
+ LOG.error(message);
+ throw new WebServiceException(message, e);
+ }
+ }
+ return hdfsApi;
+ }
+
+ public HdfsApi getHdfsApi() {
+ return getHdfsApi(context);
+ }
+
+ public static HdfsApi setHdfsApi(HdfsApi api) {
+ return hdfsApi = api;
+ }
+
+ public static Response badRequestResponse(String message) {
+ HashMap<String, Object> response = new HashMap<String, Object>();
+ response.put("message", message);
+ response.put("status", 400);
+ return Response.status(400).entity(new JSONObject(response)).type(MediaType.APPLICATION_JSON).build();
+ }
+
+ public static Response serverErrorResponse(String message) {
+ HashMap<String, Object> response = new HashMap<String, Object>();
+ response.put("message", message);
+ response.put("status", 500);
+ return Response.status(500).entity(new JSONObject(response)).type(MediaType.APPLICATION_JSON).build();
+ }
+
+ public static Response notFoundResponse(String message) {
+ HashMap<String, Object> response = new HashMap<String, Object>();
+ response.put("message", message);
+ response.put("status", 404);
+ return Response.status(404).entity(new JSONObject(response)).type(MediaType.APPLICATION_JSON).build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/services/HelpService.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/services/HelpService.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/services/HelpService.java
new file mode 100644
index 0000000..c5f1721
--- /dev/null
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/services/HelpService.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.pig.services;
+
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.ViewResourceHandler;
+import org.json.simple.JSONObject;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.*;
+
+public class HelpService extends BaseService {
+ private ViewContext context;
+ private ViewResourceHandler handler;
+
+ public HelpService(ViewContext context, ViewResourceHandler handler) {
+ super();
+ this.context = context;
+ this.handler = handler;
+ }
+
+ @GET
+ @Path("/config")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response config(){
+ JSONObject object = new JSONObject();
+ String fs = context.getProperties().get("dataworker.defaultFs");
+ object.put("dataworker.defaultFs", fs);
+ return Response.ok(object).build();
+ }
+
+ @GET
+ @Path("/version")
+ @Produces(MediaType.TEXT_PLAIN)
+ public Response version(){
+ return Response.ok("0.0.1-SNAPSHOT").build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/templeton/client/Request.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/templeton/client/Request.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/templeton/client/Request.java
new file mode 100644
index 0000000..de9142f
--- /dev/null
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/templeton/client/Request.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.pig.templeton.client;
+
+import com.google.gson.Gson;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import org.apache.ambari.view.URLStreamProvider;
+import org.apache.ambari.view.ViewContext;
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.UriBuilder;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Request handler, supports GET, POST, PUT, DELETE methods
+ * @param <RESPONSE> data type to deserialize response from JSON
+ */
+public class Request<RESPONSE> {
+ protected final Class<RESPONSE> responseClass;
+ protected final ViewContext context;
+ protected final WebResource resource;
+
+ protected final Gson gson = new Gson();
+
+ protected final static Logger LOG =
+ LoggerFactory.getLogger(Request.class);
+
+ public Request(WebResource resource, Class<RESPONSE> responseClass, ViewContext context) {
+ this.resource = resource;
+ this.responseClass = responseClass;
+ this.context = context;
+ }
+
+ /**
+ * Main implementation of GET request
+ * @param resource resource
+ * @return unmarshalled response data
+ */
+ public RESPONSE get(WebResource resource) throws IOException {
+ LOG.debug("GET " + resource.toString());
+
+ InputStream inputStream = context.getURLStreamProvider().readFrom(resource.toString(), "GET",
+ null, new HashMap<String, String>());
+
+ String responseJson = IOUtils.toString(inputStream);
+ LOG.debug(String.format("RESPONSE => %s", responseJson));
+ return gson.fromJson(responseJson, responseClass);
+ }
+
+ public RESPONSE get() throws IOException {
+ return get(this.resource);
+ }
+
+ public RESPONSE get(MultivaluedMapImpl params) throws IOException {
+ return get(this.resource.queryParams(params));
+ }
+
+ /**
+ * Main implementation of POST request
+ * @param resource resource
+ * @param data post body
+ * @return unmarshalled response data
+ */
+ public RESPONSE post(WebResource resource, MultivaluedMapImpl data) throws IOException {
+ LOG.debug("POST " + resource.toString());
+ LOG.debug("data: " + data.toString());
+
+ UriBuilder builder = UriBuilder.fromPath("host/");
+ for(String key : data.keySet()) {
+ for(String value : data.get(key))
+ builder.queryParam(key, value);
+ }
+
+ if (data != null)
+ LOG.debug("... data: " + builder.build().getRawQuery());
+
+ Map<String, String> headers = new HashMap<String, String>();
+ headers.put("Content-Type", "application/x-www-form-urlencoded");
+
+ InputStream inputStream = context.getURLStreamProvider().readFrom(resource.toString(),
+ "POST", builder.build().getRawQuery(), headers);
+ String responseJson = IOUtils.toString(inputStream);
+
+ LOG.debug(String.format("RESPONSE => %s", responseJson));
+ return gson.fromJson(responseJson, responseClass);
+ }
+
+ public RESPONSE post(MultivaluedMapImpl data) throws IOException {
+ return post(resource, data);
+ }
+
+ public RESPONSE post() throws IOException {
+ return post(resource, new MultivaluedMapImpl());
+ }
+
+ public RESPONSE post(MultivaluedMapImpl params, MultivaluedMapImpl data) throws IOException {
+ return post(resource.queryParams(params), data);
+ }
+
+ public static void main(String[] args) {
+ UriBuilder builder = UriBuilder.fromPath("host/");
+ builder.queryParam("aa", "/tmp/.pigjobs/hue/test111_17-03-2014-16-50-37");
+ System.out.println(builder.build().getRawQuery());
+ }
+
+ /**
+ * Main implementation of PUT request
+ * @param resource resource
+ * @param data put body
+ * @return unmarshalled response data
+ */
+ public RESPONSE put(WebResource resource, MultivaluedMapImpl data) throws IOException {
+ LOG.debug("PUT " + resource.toString());
+
+ UriBuilder builder = UriBuilder.fromPath("host/");
+ for(String key : data.keySet()) {
+ for(String value : data.get(key))
+ builder.queryParam(key, value);
+ }
+
+ if (data != null)
+ LOG.debug("... data: " + builder.build().getRawQuery());
+
+ Map<String, String> headers = new HashMap<String, String>();
+ headers.put("Content-Type", "application/x-www-form-urlencoded");
+
+ InputStream inputStream = context.getURLStreamProvider().readFrom(resource.toString(),
+ "PUT", builder.build().getRawQuery(), headers);
+ String responseJson = IOUtils.toString(inputStream);
+
+ LOG.debug(String.format("RESPONSE => %s", responseJson));
+ return gson.fromJson(responseJson, responseClass);
+ }
+
+ public RESPONSE put(MultivaluedMapImpl data) throws IOException {
+ return put(resource, data);
+ }
+
+ public RESPONSE put() throws IOException {
+ return put(resource, new MultivaluedMapImpl());
+ }
+
+ public RESPONSE put(MultivaluedMapImpl params, MultivaluedMapImpl data) throws IOException {
+ return put(resource.queryParams(params), data);
+ }
+
+ /**
+ * Main implementation of DELETE request
+ * @param resource resource
+ * @param data delete body
+ * @return unmarshalled response data
+ */
+ public RESPONSE delete(WebResource resource, MultivaluedMapImpl data) throws IOException {
+ LOG.debug("DELETE " + resource.toString());
+
+ UriBuilder builder = UriBuilder.fromPath("host/");
+ for(String key : data.keySet()) {
+ for(String value : data.get(key))
+ builder.queryParam(key, value);
+ }
+
+ if (data != null)
+ LOG.debug("... data: " + builder.build().getRawQuery());
+
+ Map<String, String> headers = new HashMap<String, String>();
+ headers.put("Content-Type", "application/x-www-form-urlencoded");
+
+ InputStream inputStream = context.getURLStreamProvider().readFrom(resource.toString(),
+ "DELETE", builder.build().getRawQuery(), headers);
+ String responseJson = IOUtils.toString(inputStream);
+
+ LOG.debug(String.format("RESPONSE => %s", responseJson));
+ return gson.fromJson(responseJson, responseClass);
+ }
+
+ public RESPONSE delete(MultivaluedMapImpl data) throws IOException {
+ return delete(resource, data);
+ }
+
+ public RESPONSE delete() throws IOException {
+ return delete(resource, new MultivaluedMapImpl());
+ }
+
+ public RESPONSE delete(MultivaluedMapImpl params, MultivaluedMapImpl data) throws IOException {
+ return delete(resource.queryParams(params), data);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/templeton/client/TempletonApi.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/templeton/client/TempletonApi.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/templeton/client/TempletonApi.java
new file mode 100644
index 0000000..9675a1e
--- /dev/null
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/templeton/client/TempletonApi.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.pig.templeton.client;
+
+import com.google.gson.Gson;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.api.json.JSONConfiguration;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import org.apache.ambari.view.ViewContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MediaType;
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+//TODO: extract to separate JAR outside ambari-views scope
+/**
+ * Templeton Business Delegate
+ */
+public class TempletonApi {
+ private final Gson gson = new Gson();
+
+ protected final static Logger LOG =
+ LoggerFactory.getLogger(TempletonApi.class);
+
+ protected WebResource service;
+ private String username;
+ private String doAs;
+ private ViewContext context;
+
+ /**
+ * TempletonApi constructor
+ * @param api dataworker.templeton_url
+ * @param username templeton username
+ * @param doAs doAs argument
+ * @param context context with URLStreamProvider
+ */
+ public TempletonApi(String api, String username, String doAs, ViewContext context) {
+ this.username = username;
+ this.doAs = doAs;
+ this.context = context;
+ ClientConfig config = new DefaultClientConfig();
+ config.getFeatures().put(JSONConfiguration.FEATURE_POJO_MAPPING, Boolean.TRUE);
+ Client client = Client.create(config);
+ this.service = client.resource(api);
+ }
+
+ public TempletonApi(String api, String username, ViewContext context) {
+ this(api, username, username, context);
+ }
+
+ /**
+ * Create and queue a Pig job.
+ * @param execute String containing an entire, short pig program to run. (e.g. pwd)
+ * @param pigFile HDFS file name of a pig program to run. (One of either "execute" or "file" is required )
+ * @param statusDir A directory where Templeton will write the status of the Pig job. If
+ * provided, it is the caller's responsibility to remove this directory when done.
+ * @param arg Set a program argument. Optional None
+ * @return id A string containing the job ID similar to "job_201110132141_0001".
+ * info A JSON object containing the information returned when the job was queued.
+ */
+ public JobData runPigQuery(String execute, File pigFile, String statusDir, String arg) throws IOException {
+ MultivaluedMapImpl data = new MultivaluedMapImpl();
+ if (execute != null)
+ data.add("execute", execute);
+ if (pigFile != null)
+ data.add("file", pigFile.toString());
+ if (statusDir != null)
+ data.add("statusdir", statusDir);
+ if (arg != null && !arg.isEmpty()) {
+ for(String arg1 : arg.split("\t")) {
+ data.add("arg", arg1);
+ }
+ }
+
+ TempletonRequest<JobData> request =
+ new TempletonRequest<JobData>(service.path("pig"), JobData.class, username, doAs, context);
+
+ return request.post(data);
+ }
+
+ public JobData runPigQuery(File pigFile, String statusDir, String arg) throws IOException {
+ return runPigQuery(null, pigFile, statusDir, arg);
+ }
+
+ public JobData runPigQuery(String execute, String statusDir, String arg) throws IOException {
+ return runPigQuery(execute, null, statusDir, arg);
+ }
+
+ public JobData runPigQuery(String execute) throws IOException {
+ return runPigQuery(execute, null, null, null);
+ }
+
+ public JobInfo checkJob(String jobId) throws IOException {
+ TempletonRequest<JobInfo> request =
+ new TempletonRequest<JobInfo>(service.path("jobs").path(jobId), JobInfo.class, username, context);
+
+ return request.get();
+ }
+
+ public void killJob(String jobId) throws IOException {
+ TempletonRequest<JobInfo> request =
+ new TempletonRequest<JobInfo>(service.path("jobs").path(jobId), JobInfo.class, username, context);
+
+ try {
+ request.delete();
+ } catch (IOException e) {
+ //TODO: remove this after HIVE-5835 resolved
+ LOG.debug("Ignoring 500 response from webhcat (see HIVE-5835)");
+ }
+ }
+
+ public Status status() throws IOException {
+ TempletonRequest<Status> request =
+ new TempletonRequest<Status>(service.path("status"), Status.class,
+ username, doAs, context);
+ return request.get();
+ }
+
+ public class Status {
+ public String status;
+ public String version;
+ }
+
+ public class JobData {
+ public String id;
+ }
+
+ public class JobInfo {
+ public Map<String, Object> status;
+ public Map<String, Object> profile;
+ public Map<String, Object> userargs;
+
+ public String id;
+ public String parentId;
+ public String percentComplete;
+ public Integer exitValue;
+ public String user;
+ public String callback;
+ public String completed;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/templeton/client/TempletonRequest.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/templeton/client/TempletonRequest.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/templeton/client/TempletonRequest.java
new file mode 100644
index 0000000..38ec211
--- /dev/null
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/templeton/client/TempletonRequest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.pig.templeton.client;
+
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+import org.apache.ambari.view.ViewContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Request handler that adds user.name and doAs
+ * GET parameters to every request
+ * @param <RESPONSE> data type to deserialize response from JSON
+ */
+public class TempletonRequest<RESPONSE> extends Request<RESPONSE> {
+ private String username;
+ private String doAs;
+
+ protected final static Logger LOG =
+ LoggerFactory.getLogger(TempletonRequest.class);
+
+ public TempletonRequest(WebResource resource, Class<RESPONSE> responseClass,
+ String username, ViewContext context) {
+ this(resource, responseClass, username, username, context);
+ }
+
+ public TempletonRequest(WebResource resource, Class<RESPONSE> responseClass,
+ String username, String doAs, ViewContext context) {
+ super(resource, responseClass, context);
+ this.username = username;
+ this.doAs = doAs;
+ }
+
+ public RESPONSE get(WebResource resource) throws IOException {
+ MultivaluedMapImpl params = new MultivaluedMapImpl();
+ params.add("user.name", username);
+ params.add("doAs", doAs);
+ return super.get(resource.queryParams(params));
+ }
+
+ public RESPONSE put(WebResource resource, MultivaluedMapImpl data) throws IOException {
+ MultivaluedMapImpl params = new MultivaluedMapImpl();
+ params.add("user.name", username);
+ params.add("doAs", doAs);
+ return super.put(resource.queryParams(params), data);
+ }
+
+ public RESPONSE delete(WebResource resource, MultivaluedMapImpl data) throws IOException {
+ MultivaluedMapImpl params = new MultivaluedMapImpl();
+ params.add("user.name", username);
+ params.add("doAs", doAs);
+ return super.delete(resource.queryParams(params), data);
+ }
+
+ public RESPONSE post(WebResource resource, MultivaluedMapImpl data) throws IOException {
+ MultivaluedMapImpl params = new MultivaluedMapImpl();
+ params.add("user.name", username);
+ params.add("doAs", doAs);
+ return super.post(resource.queryParams(params), data);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/5eb22214/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/utils/FilePaginator.java
----------------------------------------------------------------------
diff --git a/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/utils/FilePaginator.java b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/utils/FilePaginator.java
new file mode 100644
index 0000000..9312204
--- /dev/null
+++ b/contrib/views/pig/src/main/java/org/apache/ambari/view/pig/utils/FilePaginator.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.view.pig.utils;
+
+import org.apache.ambari.view.ViewContext;
+import org.apache.ambari.view.pig.services.BaseService;
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+
+import static java.lang.Math.ceil;
+
+public class FilePaginator {
+ private static int PAGE_SIZE = 1*1024*1024; // 1MB
+
+ private String filePath;
+ private HdfsApi hdfsApi;
+
+ public FilePaginator(String filePath, ViewContext context) {
+ this.filePath = filePath;
+ hdfsApi = BaseService.getHdfsApi(context);
+ }
+
+ public static void setPageSize(int PAGE_SIZE) {
+ FilePaginator.PAGE_SIZE = PAGE_SIZE;
+ }
+
+ public long pageCount() throws IOException, InterruptedException {
+ return (long)
+ ceil( hdfsApi.getFileStatus(filePath).getLen() / ((double)PAGE_SIZE) );
+ }
+
+ public String readPage(long page) throws IOException, InterruptedException {
+ FSDataInputStream stream = hdfsApi.open(filePath);
+ try {
+ stream.seek(page * PAGE_SIZE);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Page " + page + " does not exists");
+ }
+
+ byte[] buffer = new byte[PAGE_SIZE];
+ int readCount = 0;
+ int read = 0;
+ while(read < PAGE_SIZE) {
+ try {
+ readCount = stream.read(buffer, read, PAGE_SIZE-read);
+ } catch (IOException e) {
+ stream.close();
+ throw e;
+ }
+ if (readCount == -1)
+ break;
+ read += readCount;
+ }
+ if (read != 0) {
+ byte[] readData = Arrays.copyOfRange(buffer, 0, read);
+ return new String(readData, Charset.forName("UTF-8"));
+ } else {
+ if (page == 0) {
+ return "";
+ }
+ throw new IllegalArgumentException("Page " + page + " does not exists");
+ }
+ }
+}