You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/12/12 02:51:33 UTC

[kylin] branch master updated: KYLIN-3406 Ignore execute output file lose

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new ba0fbf9  KYLIN-3406 Ignore execute output file lose
ba0fbf9 is described below

commit ba0fbf9b42034dbc72e916ba3fc59bcaac906f0d
Author: hit-lacus <hi...@126.com>
AuthorDate: Wed Dec 12 09:38:56 2018 +0800

    KYLIN-3406 Ignore execute output file lose
---
 .../org/apache/kylin/job/dao/ExecutableDao.java    | 34 +++++++++++++++-------
 .../kylin/storage/hbase/HBaseResourceStore.java    | 12 +++-----
 2 files changed, 27 insertions(+), 19 deletions(-)

diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
index 9335942..6a63ff4 100644
--- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.job.dao;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -34,6 +35,7 @@ import org.apache.kylin.common.util.AutoReadWriteLock;
 import org.apache.kylin.job.exception.PersistentException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.metadata.cachesync.Broadcaster;
 import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
 import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
@@ -47,7 +49,8 @@ import com.google.common.collect.Lists;
 public class ExecutableDao {
 
     private static final Serializer<ExecutablePO> JOB_SERIALIZER = new JsonSerializer<ExecutablePO>(ExecutablePO.class);
-    private static final Serializer<ExecutableOutputPO> JOB_OUTPUT_SERIALIZER = new JsonSerializer<ExecutableOutputPO>(ExecutableOutputPO.class);
+    private static final Serializer<ExecutableOutputPO> JOB_OUTPUT_SERIALIZER = new JsonSerializer<ExecutableOutputPO>(
+            ExecutableOutputPO.class);
     private static final Logger logger = LoggerFactory.getLogger(ExecutableDao.class);
 
     public static ExecutableDao getInstance(KylinConfig config) {
@@ -60,7 +63,7 @@ public class ExecutableDao {
     }
 
     // ============================================================================
-    
+
     private ResourceStore store;
 
     private CaseInsensitiveStringCache<ExecutablePO> executableDigestMap;
@@ -76,7 +79,7 @@ public class ExecutableDao {
     private AutoReadWriteLock executableOutputDigestMapLock = new AutoReadWriteLock();
 
     private ExecutableDao(KylinConfig config) throws IOException {
-        logger.info("Using metadata url: " + config);
+        logger.info("Using metadata url: {}", config);
         this.store = ResourceStore.getStore(config);
         this.executableDigestMap = new CaseInsensitiveStringCache<>(config, "execute");
         this.executableDigestCrud = new CachedCrudAssist<ExecutablePO>(store, ResourceStore.EXECUTE_RESOURCE_ROOT, "",
@@ -86,7 +89,7 @@ public class ExecutableDao {
                 try {
                     ExecutablePO executablePO = readJobResource(path);
                     if (executablePO == null) {
-                        logger.warn("No job found at " + path + ", returning null");
+                        logger.warn("No job found at {}, returning null", path);
                         executableDigestMap.removeLocal(resourceName(path));
                         return null;
                     }
@@ -114,8 +117,9 @@ public class ExecutableDao {
         this.executableDigestCrud.reloadAll();
 
         this.executableOutputDigestMap = new CaseInsensitiveStringCache<>(config, "execute_output");
-        this.executableOutputDigestCrud = new CachedCrudAssist<ExecutableOutputPO>(store, ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT,
-                "", ExecutableOutputPO.class, executableOutputDigestMap, false) {
+        this.executableOutputDigestCrud = new CachedCrudAssist<ExecutableOutputPO>(store,
+                ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT, "", ExecutableOutputPO.class, executableOutputDigestMap,
+                false) {
             @Override
             public void reloadAll() throws IOException {
                 logger.debug("Reloading execute_output from " + ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT);
@@ -129,8 +133,8 @@ public class ExecutableDao {
                             reloadAt(path);
                     }
 
-                    logger.debug("Loaded " + executableOutputDigestMap.size() + " execute_output digest(s) out of " + paths.size()
-                            + " resource");
+                    logger.debug("Loaded {} execute_output digest(s) out of {} resource",
+                            executableOutputDigestMap.size(), paths.size());
                 }
             }
 
@@ -139,7 +143,7 @@ public class ExecutableDao {
                 try {
                     ExecutableOutputPO executableOutputPO = readJobOutputResource(path);
                     if (executableOutputPO == null) {
-                        logger.warn("No job output found at " + path + ", returning null");
+                        logger.warn("No job output found at {}, returning null", path);
                         executableOutputDigestMap.removeLocal(resourceName(path));
                         return null;
                     }
@@ -383,8 +387,9 @@ public class ExecutableDao {
     }
 
     public ExecutableOutputPO getJobOutput(String uuid) throws PersistentException {
+        ExecutableOutputPO result = null;
         try {
-            ExecutableOutputPO result = readJobOutputResource(pathOfJobOutput(uuid));
+            result = readJobOutputResource(pathOfJobOutput(uuid));
             if (result == null) {
                 result = new ExecutableOutputPO();
                 result.setUuid(uuid);
@@ -393,7 +398,14 @@ public class ExecutableDao {
             return result;
         } catch (IOException e) {
             logger.error("error get job output id:" + uuid, e);
-            throw new PersistentException(e);
+            if (e.getCause() instanceof FileNotFoundException) {
+                result = new ExecutableOutputPO();
+                result.setUuid(uuid);
+                result.setStatus(ExecutableState.SUCCEED.name());
+                return result;
+            } else {
+                throw new PersistentException(e);
+            }
         }
     }
 
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index 9e050ef..14c5ea7 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -209,8 +209,6 @@ public class HBaseResourceStore extends PushdownResourceStore {
     }
 
     private void tuneScanParameters(Scan scan) {
-        // divide by 10 as some resource like dictionary or snapshot can be very large
-        // scan.setCaching(kylinConfig.getHBaseScanCacheRows() / 10);
         scan.setCaching(kylinConfig.getHBaseScanCacheRows());
 
         scan.setMaxResultSize(kylinConfig.getHBaseScanMaxResultSize());
@@ -221,8 +219,7 @@ public class HBaseResourceStore extends PushdownResourceStore {
         void visit(String childPath, String fullPath, Result hbaseResult) throws IOException;
     }
 
-    private RawResource rawResource(String path, Result hbaseResult, boolean loadContent)
-            throws IOException {
+    private RawResource rawResource(String path, Result hbaseResult, boolean loadContent) {
         long lastModified = getTimestamp(hbaseResult);
         if (loadContent) {
             try {
@@ -304,7 +301,7 @@ public class HBaseResourceStore extends PushdownResourceStore {
 
             table.put(put);
 
-        } catch (Throwable ex) {
+        } catch (Exception ex) {
             if (pushdown != null)
                 pushdown.rollback();
             throw ex;
@@ -334,8 +331,7 @@ public class HBaseResourceStore extends PushdownResourceStore {
             put.addColumn(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(newTS));
 
             boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, put);
-            logger.trace("Update row " + resPath + " from oldTs: " + oldTS + ", to newTs: " + newTS
-                    + ", operation result: " + ok);
+            logger.trace("Update row {} from oldTs: {}, to newTs: {}, operation result: {}", resPath, oldTS, newTS, ok);
             if (!ok) {
                 long real = getResourceTimestampImpl(resPath);
                 throw new WriteConflictException(
@@ -344,7 +340,7 @@ public class HBaseResourceStore extends PushdownResourceStore {
 
             return newTS;
 
-        } catch (Throwable ex) {
+        } catch (Exception ex) {
             if (pushdown != null)
                 pushdown.rollback();
             throw ex;