You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/07/24 00:39:50 UTC

incubator-griffin git commit: [GRIFFIN-184] service for download miss records

Repository: incubator-griffin
Updated Branches:
  refs/heads/master aa6a5a6f7 -> f0ee5ec92


[GRIFFIN-184] service for download miss records

GRIFFIN-184 - service for download miss records

Author: Li, Juan <su...@gmail.com>

Closes #365 from icesmartjuan/feature/download.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/f0ee5ec9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/f0ee5ec9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/f0ee5ec9

Branch: refs/heads/master
Commit: f0ee5ec9261edb50cb4cb8a69edd348feb953b52
Parents: aa6a5a6
Author: Li, Juan <su...@gmail.com>
Authored: Tue Jul 24 08:39:42 2018 +0800
Committer: William Guo <gu...@apache.org>
Committed: Tue Jul 24 08:39:42 2018 +0800

----------------------------------------------------------------------
 .../apache/griffin/core/job/JobController.java  |  5 +--
 .../org/apache/griffin/core/job/JobService.java |  2 ++
 .../apache/griffin/core/job/JobServiceImpl.java | 35 ++++++++++++++++++++
 .../org/apache/griffin/core/util/FSUtil.java    | 20 ++++++++++-
 4 files changed, 59 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f0ee5ec9/service/src/main/java/org/apache/griffin/core/job/JobController.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobController.java b/service/src/main/java/org/apache/griffin/core/job/JobController.java
index 1d0a8ac..2f65b63 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobController.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobController.java
@@ -87,8 +87,9 @@ public class JobController {
     }
 
     @RequestMapping(path = "/jobs/download", method = RequestMethod.GET)
-    public ResponseEntity<Resource> download(@RequestParam("hdfsPath") String hdfsPath) throws IOException {
-        InputStreamResource resource = new InputStreamResource(FSUtil.getSampleInputStream(hdfsPath));
+    public ResponseEntity<Resource> download(@RequestParam("jobName") String jobName ,@RequestParam("ts") long timestamp) throws Exception {
+        String path = jobService.getJobHdfsPersistPath(jobName,timestamp);
+        InputStreamResource resource = new InputStreamResource(FSUtil.getMissSampleInputStream(path));
         return ResponseEntity.ok().
                 header("content-disposition", "attachment; filename = sampleMissingData.json")
                 .contentType(MediaType.APPLICATION_OCTET_STREAM)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f0ee5ec9/service/src/main/java/org/apache/griffin/core/job/JobService.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobService.java b/service/src/main/java/org/apache/griffin/core/job/JobService.java
index 7539cea..269f004 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobService.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobService.java
@@ -42,4 +42,6 @@ public interface JobService {
     List<JobInstanceBean> findInstancesOfJob(Long jobId, int page, int size);
 
     JobHealth getHealthInfo();
+
+    String getJobHdfsPersistPath(String jobName, long timestamp);
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f0ee5ec9/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
index 8e68567..29cb249 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java
@@ -20,6 +20,7 @@ under the License.
 package org.apache.griffin.core.job;
 
 import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang.StringUtils;
 import org.apache.griffin.core.exception.GriffinException;
 import org.apache.griffin.core.job.entity.*;
@@ -33,6 +34,8 @@ import org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType;
 import org.apache.griffin.core.measure.repo.GriffinMeasureRepo;
 import org.apache.griffin.core.util.JsonUtil;
 import org.apache.griffin.core.util.YarnNetUtil;
+import org.json.JSONArray;
+import org.json.JSONObject;
 import org.quartz.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,6 +58,8 @@ import java.util.List;
 import java.util.TimeZone;
 
 import static java.util.TimeZone.getTimeZone;
+import static org.apache.griffin.core.config.EnvConfig.ENV_BATCH;
+import static org.apache.griffin.core.config.EnvConfig.ENV_STREAMING;
 import static org.apache.griffin.core.exception.GriffinExceptionMessage.*;
 import static org.apache.griffin.core.job.entity.LivySessionStates.State.*;
 import static org.apache.griffin.core.job.entity.LivySessionStates.isActive;
@@ -511,4 +516,34 @@ public class JobServiceImpl implements JobService {
         List<JobInstanceBean> instances = instanceRepo.findByJobId(jobId, pageable);
         return !CollectionUtils.isEmpty(instances) && LivySessionStates.isHealthy(instances.get(0).getState());
     }
+
+    @Override
+    public String getJobHdfsPersistPath(String jobName, long timestamp) {
+        List<AbstractJob> jobList = jobRepo.findByJobNameAndDeleted(jobName, false);
+        if (jobList.size() == 0) {
+            return null;
+        }
+        if (jobList.get(0).getType().toLowerCase().equals("batch")) {
+            return getPersistPath(ENV_BATCH) + "/" + jobName + "/" + timestamp + "";
+        }
+
+        return getPersistPath(ENV_STREAMING) + "/" + jobName + "/" + timestamp + "";
+    }
+
+    private String getPersistPath(String jsonString) {
+        try {
+            JSONObject obj = new JSONObject(jsonString);
+            JSONArray persistArray = obj.getJSONArray("persist");
+            for (int i = 0; i < persistArray.length(); i++) {
+                if (persistArray.getJSONObject(i).get("type").equals("hdfs")) {
+                    return persistArray.getJSONObject(i).getJSONObject("config").getString("path");
+                }
+            }
+
+            return null;
+        } catch (Exception ex) {
+            LOGGER.error("Fail to get Persist path from {}", jsonString, ex);
+            return null;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f0ee5ec9/service/src/main/java/org/apache/griffin/core/util/FSUtil.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/util/FSUtil.java b/service/src/main/java/org/apache/griffin/core/util/FSUtil.java
index ed40ee2..f6cfd2c 100644
--- a/service/src/main/java/org/apache/griffin/core/util/FSUtil.java
+++ b/service/src/main/java/org/apache/griffin/core/util/FSUtil.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
@@ -177,4 +176,23 @@ public class FSUtil {
         }
     }
 
+    public static String getFirstMissRecordPath(String hdfsDir) throws Exception{
+        List<FileStatus> fileList = listFileStatus(hdfsDir);
+        for(int i=0; i<fileList.size();i++){
+            if(fileList.get(i).getPath().toUri().toString().toLowerCase().contains("missrecord")){
+                return fileList.get(i).getPath().toUri().toString();
+            }
+        }
+        return null;
+    }
+
+    public static InputStream getMissSampleInputStream(String path) throws Exception {
+        List<String> subDirList = listSubDir(path);
+        //FIXME: only handle 1-sub dir here now
+        for(int i=0; i< subDirList.size();i++){
+            return getSampleInputStream(getFirstMissRecordPath(subDirList.get(i)));
+        }
+        return getSampleInputStream(getFirstMissRecordPath(path));
+    }
+
 }