You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/07/24 00:39:50 UTC
incubator-griffin git commit: [GRIFFIN-184] service for download miss
records
Repository: incubator-griffin
Updated Branches:
refs/heads/master aa6a5a6f7 -> f0ee5ec92
[GRIFFIN-184] service for download miss records
GRIFFIN-184 - service for download miss records
Author: Li, Juan <su...@gmail.com>
Closes #365 from icesmartjuan/feature/download.
Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/f0ee5ec9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/f0ee5ec9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/f0ee5ec9
Branch: refs/heads/master
Commit: f0ee5ec9261edb50cb4cb8a69edd348feb953b52
Parents: aa6a5a6
Author: Li, Juan <su...@gmail.com>
Authored: Tue Jul 24 08:39:42 2018 +0800
Committer: William Guo <gu...@apache.org>
Committed: Tue Jul 24 08:39:42 2018 +0800
----------------------------------------------------------------------
.../apache/griffin/core/job/JobController.java | 5 +--
.../org/apache/griffin/core/job/JobService.java | 2 ++
.../apache/griffin/core/job/JobServiceImpl.java | 35 ++++++++++++++++++++
.../org/apache/griffin/core/util/FSUtil.java | 20 ++++++++++-
4 files changed, 59 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f0ee5ec9/service/src/main/java/org/apache/griffin/core/job/JobController.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobController.java b/service/src/main/java/org/apache/griffin/core/job/JobController.java
index 1d0a8ac..2f65b63 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobController.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobController.java
@@ -87,8 +87,9 @@ public class JobController {
}
@RequestMapping(path = "/jobs/download", method = RequestMethod.GET)
- public ResponseEntity<Resource> download(@RequestParam("hdfsPath") String hdfsPath) throws IOException {
- InputStreamResource resource = new InputStreamResource(FSUtil.getSampleInputStream(hdfsPath));
+ public ResponseEntity<Resource> download(@RequestParam("jobName") String jobName ,@RequestParam("ts") long timestamp) throws Exception {
+ String path = jobService.getJobHdfsPersistPath(jobName,timestamp);
+ InputStreamResource resource = new InputStreamResource(FSUtil.getMissSampleInputStream(path));
return ResponseEntity.ok().
header("content-disposition", "attachment; filename = sampleMissingData.json")
.contentType(MediaType.APPLICATION_OCTET_STREAM)
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f0ee5ec9/service/src/main/java/org/apache/griffin/core/job/JobService.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobService.java b/service/src/main/java/org/apache/griffin/core/job/JobService.java
index 7539cea..269f004 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobService.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobService.java
@@ -42,4 +42,6 @@ public interface JobService {
List<JobInstanceBean> findInstancesOfJob(Long jobId, int page, int size);
JobHealth getHealthInfo();
+
+ String getJobHdfsPersistPath(String jobName, long timestamp);
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f0ee5ec9/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
index 8e68567..29cb249 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
@@ -20,6 +20,7 @@ under the License.
package org.apache.griffin.core.job;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang.StringUtils;
import org.apache.griffin.core.exception.GriffinException;
import org.apache.griffin.core.job.entity.*;
@@ -33,6 +34,8 @@ import org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType;
import org.apache.griffin.core.measure.repo.GriffinMeasureRepo;
import org.apache.griffin.core.util.JsonUtil;
import org.apache.griffin.core.util.YarnNetUtil;
+import org.json.JSONArray;
+import org.json.JSONObject;
import org.quartz.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,6 +58,8 @@ import java.util.List;
import java.util.TimeZone;
import static java.util.TimeZone.getTimeZone;
+import static org.apache.griffin.core.config.EnvConfig.ENV_BATCH;
+import static org.apache.griffin.core.config.EnvConfig.ENV_STREAMING;
import static org.apache.griffin.core.exception.GriffinExceptionMessage.*;
import static org.apache.griffin.core.job.entity.LivySessionStates.State.*;
import static org.apache.griffin.core.job.entity.LivySessionStates.isActive;
@@ -511,4 +516,34 @@ public class JobServiceImpl implements JobService {
List<JobInstanceBean> instances = instanceRepo.findByJobId(jobId, pageable);
return !CollectionUtils.isEmpty(instances) && LivySessionStates.isHealthy(instances.get(0).getState());
}
+
+ @Override
+ public String getJobHdfsPersistPath(String jobName, long timestamp) {
+ List<AbstractJob> jobList = jobRepo.findByJobNameAndDeleted(jobName, false);
+ if (jobList.size() == 0) {
+ return null;
+ }
+ if (jobList.get(0).getType().toLowerCase().equals("batch")) {
+ return getPersistPath(ENV_BATCH) + "/" + jobName + "/" + timestamp + "";
+ }
+
+ return getPersistPath(ENV_STREAMING) + "/" + jobName + "/" + timestamp + "";
+ }
+
+ private String getPersistPath(String jsonString) {
+ try {
+ JSONObject obj = new JSONObject(jsonString);
+ JSONArray persistArray = obj.getJSONArray("persist");
+ for (int i = 0; i < persistArray.length(); i++) {
+ if (persistArray.getJSONObject(i).get("type").equals("hdfs")) {
+ return persistArray.getJSONObject(i).getJSONObject("config").getString("path");
+ }
+ }
+
+ return null;
+ } catch (Exception ex) {
+ LOGGER.error("Fail to get Persist path from {}", jsonString, ex);
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f0ee5ec9/service/src/main/java/org/apache/griffin/core/util/FSUtil.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/util/FSUtil.java b/service/src/main/java/org/apache/griffin/core/util/FSUtil.java
index ed40ee2..f6cfd2c 100644
--- a/service/src/main/java/org/apache/griffin/core/util/FSUtil.java
+++ b/service/src/main/java/org/apache/griffin/core/util/FSUtil.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
@@ -177,4 +176,23 @@ public class FSUtil {
}
}
+ public static String getFirstMissRecordPath(String hdfsDir) throws Exception{
+ List<FileStatus> fileList = listFileStatus(hdfsDir);
+ for(int i=0; i<fileList.size();i++){
+ if(fileList.get(i).getPath().toUri().toString().toLowerCase().contains("missrecord")){
+ return fileList.get(i).getPath().toUri().toString();
+ }
+ }
+ return null;
+ }
+
+ public static InputStream getMissSampleInputStream(String path) throws Exception {
+ List<String> subDirList = listSubDir(path);
+ //FIXME: only handle 1-sub dir here now
+ for(int i=0; i< subDirList.size();i++){
+ return getSampleInputStream(getFirstMissRecordPath(subDirList.get(i)));
+ }
+ return getSampleInputStream(getFirstMissRecordPath(path));
+ }
+
}