You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@kylin.apache.org by GitBox <gi...@apache.org> on 2018/12/12 02:51:31 UTC

[GitHub] shaofengshi closed pull request #373: KYLIN-3406 Ignore execute output file lose

shaofengshi closed pull request #373: KYLIN-3406 Ignore execute output file lose
URL: https://github.com/apache/kylin/pull/373
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
index 933594247a..6a63ff45be 100644
--- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.job.dao;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -34,6 +35,7 @@
 import org.apache.kylin.job.exception.PersistentException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
 import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
@@ -47,7 +49,8 @@
 public class ExecutableDao {
 
     private static final Serializer<ExecutablePO> JOB_SERIALIZER = new JsonSerializer<ExecutablePO>(ExecutablePO.class);
-    private static final Serializer<ExecutableOutputPO> JOB_OUTPUT_SERIALIZER = new JsonSerializer<ExecutableOutputPO>(ExecutableOutputPO.class);
+    private static final Serializer<ExecutableOutputPO> JOB_OUTPUT_SERIALIZER = new JsonSerializer<ExecutableOutputPO>(
+            ExecutableOutputPO.class);
     private static final Logger logger = LoggerFactory.getLogger(ExecutableDao.class);
 
     public static ExecutableDao getInstance(KylinConfig config) {
@@ -60,7 +63,7 @@ static ExecutableDao newInstance(KylinConfig config) throws IOException {
     }
 
     // ============================================================================
-    
+
     private ResourceStore store;
 
     private CaseInsensitiveStringCache<ExecutablePO> executableDigestMap;
@@ -76,7 +79,7 @@ static ExecutableDao newInstance(KylinConfig config) throws IOException {
     private AutoReadWriteLock executableOutputDigestMapLock = new AutoReadWriteLock();
 
     private ExecutableDao(KylinConfig config) throws IOException {
-        logger.info("Using metadata url: " + config);
+        logger.info("Using metadata url: {}", config);
         this.store = ResourceStore.getStore(config);
         this.executableDigestMap = new CaseInsensitiveStringCache<>(config, "execute");
         this.executableDigestCrud = new CachedCrudAssist<ExecutablePO>(store, ResourceStore.EXECUTE_RESOURCE_ROOT, "",
@@ -86,7 +89,7 @@ public ExecutablePO reloadAt(String path) {
                 try {
                     ExecutablePO executablePO = readJobResource(path);
                     if (executablePO == null) {
-                        logger.warn("No job found at " + path + ", returning null");
+                        logger.warn("No job found at {}, returning null", path);
                         executableDigestMap.removeLocal(resourceName(path));
                         return null;
                     }
@@ -114,8 +117,9 @@ protected ExecutablePO initEntityAfterReload(ExecutablePO entity, String resourc
         this.executableDigestCrud.reloadAll();
 
         this.executableOutputDigestMap = new CaseInsensitiveStringCache<>(config, "execute_output");
-        this.executableOutputDigestCrud = new CachedCrudAssist<ExecutableOutputPO>(store, ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT,
-                "", ExecutableOutputPO.class, executableOutputDigestMap, false) {
+        this.executableOutputDigestCrud = new CachedCrudAssist<ExecutableOutputPO>(store,
+                ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT, "", ExecutableOutputPO.class, executableOutputDigestMap,
+                false) {
             @Override
             public void reloadAll() throws IOException {
                 logger.debug("Reloading execute_output from " + ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT);
@@ -129,8 +133,8 @@ public void reloadAll() throws IOException {
                             reloadAt(path);
                     }
 
-                    logger.debug("Loaded " + executableOutputDigestMap.size() + " execute_output digest(s) out of " + paths.size()
-                            + " resource");
+                    logger.debug("Loaded {} execute_output digest(s) out of {} resource",
+                            executableOutputDigestMap.size(), paths.size());
                 }
             }
 
@@ -139,7 +143,7 @@ public ExecutableOutputPO reloadAt(String path) {
                 try {
                     ExecutableOutputPO executableOutputPO = readJobOutputResource(path);
                     if (executableOutputPO == null) {
-                        logger.warn("No job output found at " + path + ", returning null");
+                        logger.warn("No job output found at {}, returning null", path);
                         executableOutputDigestMap.removeLocal(resourceName(path));
                         return null;
                     }
@@ -383,8 +387,9 @@ public void deleteJob(String uuid) throws PersistentException {
     }
 
     public ExecutableOutputPO getJobOutput(String uuid) throws PersistentException {
+        ExecutableOutputPO result = null;
         try {
-            ExecutableOutputPO result = readJobOutputResource(pathOfJobOutput(uuid));
+            result = readJobOutputResource(pathOfJobOutput(uuid));
             if (result == null) {
                 result = new ExecutableOutputPO();
                 result.setUuid(uuid);
@@ -393,7 +398,14 @@ public ExecutableOutputPO getJobOutput(String uuid) throws PersistentException {
             return result;
         } catch (IOException e) {
             logger.error("error get job output id:" + uuid, e);
-            throw new PersistentException(e);
+            if (e.getCause() instanceof FileNotFoundException) {
+                result = new ExecutableOutputPO();
+                result.setUuid(uuid);
+                result.setStatus(ExecutableState.SUCCEED.name());
+                return result;
+            } else {
+                throw new PersistentException(e);
+            }
         }
     }
 
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index 9e050ef375..14c5ea7cf0 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -209,8 +209,6 @@ private void visitFolder(String folderPath, VisitFilter filter, boolean loadCont
     }
 
     private void tuneScanParameters(Scan scan) {
-        // divide by 10 as some resource like dictionary or snapshot can be very large
-        // scan.setCaching(kylinConfig.getHBaseScanCacheRows() / 10);
         scan.setCaching(kylinConfig.getHBaseScanCacheRows());
 
         scan.setMaxResultSize(kylinConfig.getHBaseScanMaxResultSize());
@@ -221,8 +219,7 @@ private void tuneScanParameters(Scan scan) {
         void visit(String childPath, String fullPath, Result hbaseResult) throws IOException;
     }
 
-    private RawResource rawResource(String path, Result hbaseResult, boolean loadContent)
-            throws IOException {
+    private RawResource rawResource(String path, Result hbaseResult, boolean loadContent) {
         long lastModified = getTimestamp(hbaseResult);
         if (loadContent) {
             try {
@@ -304,7 +301,7 @@ protected void putSmallResource(String resPath, ContentWriter content, long ts)
 
             table.put(put);
 
-        } catch (Throwable ex) {
+        } catch (Exception ex) {
             if (pushdown != null)
                 pushdown.rollback();
             throw ex;
@@ -334,8 +331,7 @@ protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldT
             put.addColumn(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(newTS));
 
             boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, put);
-            logger.trace("Update row " + resPath + " from oldTs: " + oldTS + ", to newTs: " + newTS
-                    + ", operation result: " + ok);
+            logger.trace("Update row {} from oldTs: {}, to newTs: {}, operation result: {}", resPath, oldTS, newTS, ok);
             if (!ok) {
                 long real = getResourceTimestampImpl(resPath);
                 throw new WriteConflictException(
@@ -344,7 +340,7 @@ protected long checkAndPutResourceImpl(String resPath, byte[] content, long oldT
 
             return newTS;
 
-        } catch (Throwable ex) {
+        } catch (Exception ex) {
             if (pushdown != null)
                 pushdown.rollback();
             throw ex;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services