You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/12/12 02:51:33 UTC
[kylin] branch master updated: KYLIN-3406 Ignore execute output
file lose
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new ba0fbf9 KYLIN-3406 Ignore execute output file lose
ba0fbf9 is described below
commit ba0fbf9b42034dbc72e916ba3fc59bcaac906f0d
Author: hit-lacus <hi...@126.com>
AuthorDate: Wed Dec 12 09:38:56 2018 +0800
KYLIN-3406 Ignore execute output file lose
---
.../org/apache/kylin/job/dao/ExecutableDao.java | 34 +++++++++++++++-------
.../kylin/storage/hbase/HBaseResourceStore.java | 12 +++-----
2 files changed, 27 insertions(+), 19 deletions(-)
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
index 9335942..6a63ff4 100644
--- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
@@ -18,6 +18,7 @@
package org.apache.kylin.job.dao;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -34,6 +35,7 @@ import org.apache.kylin.common.util.AutoReadWriteLock;
import org.apache.kylin.job.exception.PersistentException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.metadata.cachesync.Broadcaster;
import org.apache.kylin.metadata.cachesync.CachedCrudAssist;
import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
@@ -47,7 +49,8 @@ import com.google.common.collect.Lists;
public class ExecutableDao {
private static final Serializer<ExecutablePO> JOB_SERIALIZER = new JsonSerializer<ExecutablePO>(ExecutablePO.class);
- private static final Serializer<ExecutableOutputPO> JOB_OUTPUT_SERIALIZER = new JsonSerializer<ExecutableOutputPO>(ExecutableOutputPO.class);
+ private static final Serializer<ExecutableOutputPO> JOB_OUTPUT_SERIALIZER = new JsonSerializer<ExecutableOutputPO>(
+ ExecutableOutputPO.class);
private static final Logger logger = LoggerFactory.getLogger(ExecutableDao.class);
public static ExecutableDao getInstance(KylinConfig config) {
@@ -60,7 +63,7 @@ public class ExecutableDao {
}
// ============================================================================
-
+
private ResourceStore store;
private CaseInsensitiveStringCache<ExecutablePO> executableDigestMap;
@@ -76,7 +79,7 @@ public class ExecutableDao {
private AutoReadWriteLock executableOutputDigestMapLock = new AutoReadWriteLock();
private ExecutableDao(KylinConfig config) throws IOException {
- logger.info("Using metadata url: " + config);
+ logger.info("Using metadata url: {}", config);
this.store = ResourceStore.getStore(config);
this.executableDigestMap = new CaseInsensitiveStringCache<>(config, "execute");
this.executableDigestCrud = new CachedCrudAssist<ExecutablePO>(store, ResourceStore.EXECUTE_RESOURCE_ROOT, "",
@@ -86,7 +89,7 @@ public class ExecutableDao {
try {
ExecutablePO executablePO = readJobResource(path);
if (executablePO == null) {
- logger.warn("No job found at " + path + ", returning null");
+ logger.warn("No job found at {}, returning null", path);
executableDigestMap.removeLocal(resourceName(path));
return null;
}
@@ -114,8 +117,9 @@ public class ExecutableDao {
this.executableDigestCrud.reloadAll();
this.executableOutputDigestMap = new CaseInsensitiveStringCache<>(config, "execute_output");
- this.executableOutputDigestCrud = new CachedCrudAssist<ExecutableOutputPO>(store, ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT,
- "", ExecutableOutputPO.class, executableOutputDigestMap, false) {
+ this.executableOutputDigestCrud = new CachedCrudAssist<ExecutableOutputPO>(store,
+ ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT, "", ExecutableOutputPO.class, executableOutputDigestMap,
+ false) {
@Override
public void reloadAll() throws IOException {
logger.debug("Reloading execute_output from " + ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT);
@@ -129,8 +133,8 @@ public class ExecutableDao {
reloadAt(path);
}
- logger.debug("Loaded " + executableOutputDigestMap.size() + " execute_output digest(s) out of " + paths.size()
- + " resource");
+ logger.debug("Loaded {} execute_output digest(s) out of {} resource",
+ executableOutputDigestMap.size(), paths.size());
}
}
@@ -139,7 +143,7 @@ public class ExecutableDao {
try {
ExecutableOutputPO executableOutputPO = readJobOutputResource(path);
if (executableOutputPO == null) {
- logger.warn("No job output found at " + path + ", returning null");
+ logger.warn("No job output found at {}, returning null", path);
executableOutputDigestMap.removeLocal(resourceName(path));
return null;
}
@@ -383,8 +387,9 @@ public class ExecutableDao {
}
public ExecutableOutputPO getJobOutput(String uuid) throws PersistentException {
+ ExecutableOutputPO result = null;
try {
- ExecutableOutputPO result = readJobOutputResource(pathOfJobOutput(uuid));
+ result = readJobOutputResource(pathOfJobOutput(uuid));
if (result == null) {
result = new ExecutableOutputPO();
result.setUuid(uuid);
@@ -393,7 +398,14 @@ public class ExecutableDao {
return result;
} catch (IOException e) {
logger.error("error get job output id:" + uuid, e);
- throw new PersistentException(e);
+ if (e.getCause() instanceof FileNotFoundException) {
+ result = new ExecutableOutputPO();
+ result.setUuid(uuid);
+ result.setStatus(ExecutableState.SUCCEED.name());
+ return result;
+ } else {
+ throw new PersistentException(e);
+ }
}
}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index 9e050ef..14c5ea7 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -209,8 +209,6 @@ public class HBaseResourceStore extends PushdownResourceStore {
}
private void tuneScanParameters(Scan scan) {
- // divide by 10 as some resource like dictionary or snapshot can be very large
- // scan.setCaching(kylinConfig.getHBaseScanCacheRows() / 10);
scan.setCaching(kylinConfig.getHBaseScanCacheRows());
scan.setMaxResultSize(kylinConfig.getHBaseScanMaxResultSize());
@@ -221,8 +219,7 @@ public class HBaseResourceStore extends PushdownResourceStore {
void visit(String childPath, String fullPath, Result hbaseResult) throws IOException;
}
- private RawResource rawResource(String path, Result hbaseResult, boolean loadContent)
- throws IOException {
+ private RawResource rawResource(String path, Result hbaseResult, boolean loadContent) {
long lastModified = getTimestamp(hbaseResult);
if (loadContent) {
try {
@@ -304,7 +301,7 @@ public class HBaseResourceStore extends PushdownResourceStore {
table.put(put);
- } catch (Throwable ex) {
+ } catch (Exception ex) {
if (pushdown != null)
pushdown.rollback();
throw ex;
@@ -334,8 +331,7 @@ public class HBaseResourceStore extends PushdownResourceStore {
put.addColumn(B_FAMILY, B_COLUMN_TS, Bytes.toBytes(newTS));
boolean ok = table.checkAndPut(row, B_FAMILY, B_COLUMN_TS, bOldTS, put);
- logger.trace("Update row " + resPath + " from oldTs: " + oldTS + ", to newTs: " + newTS
- + ", operation result: " + ok);
+ logger.trace("Update row {} from oldTs: {}, to newTs: {}, operation result: {}", resPath, oldTS, newTS, ok);
if (!ok) {
long real = getResourceTimestampImpl(resPath);
throw new WriteConflictException(
@@ -344,7 +340,7 @@ public class HBaseResourceStore extends PushdownResourceStore {
return newTS;
- } catch (Throwable ex) {
+ } catch (Exception ex) {
if (pushdown != null)
pushdown.rollback();
throw ex;