You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/10/25 06:31:14 UTC

[kylin] branch 2.4.x updated (5c10786 -> d9da8f5)

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a change to branch 2.4.x
in repository https://gitbox.apache.org/repos/asf/kylin.git.


    from 5c10786  KYLIN-3613, UnknownHostException at 'Create HTable' step when use standalone hbase cluster
     new 1c0afe9  KYLIN-3645 clean table metadata when drop project
     new d9da8f5  KYLIN-3647 Fix inconsistent states of job and its sub-task

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../kylin/job/execution/AbstractExecutable.java    | 20 ++++++-------
 .../job/execution/DefaultChainedExecutable.java    | 34 ++--------------------
 .../kylin/job/execution/ExecutableManager.java     | 26 +++++++++++++++--
 .../job/impl/threadpool/DefaultScheduler.java      |  4 +++
 ...utable.java => PersistExceptionExecutable.java} | 12 ++++----
 .../job/impl/threadpool/DefaultSchedulerTest.java  |  2 +-
 .../apache/kylin/rest/service/ProjectService.java  | 11 +++++--
 7 files changed, 56 insertions(+), 53 deletions(-)
 copy core-job/src/test/java/org/apache/kylin/job/{RunningTestExecutable.java => PersistExceptionExecutable.java} (81%)


[kylin] 02/02: KYLIN-3647 Fix inconsistent states of job and its sub-task

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch 2.4.x
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit d9da8f57361753a7c4b6f3065ed9156cada9b500
Author: Ni Chunen <ch...@kyligence.io>
AuthorDate: Fri Oct 19 14:25:10 2018 +0800

    KYLIN-3647 Fix inconsistent states of job and its sub-task
---
 .../kylin/job/execution/AbstractExecutable.java    | 20 +++++------
 .../job/execution/DefaultChainedExecutable.java    | 34 ++-----------------
 .../kylin/job/execution/ExecutableManager.java     | 26 +++++++++++++--
 .../job/impl/threadpool/DefaultScheduler.java      |  4 +++
 .../kylin/job/PersistExceptionExecutable.java      | 39 ++++++++++++++++++++++
 .../job/impl/threadpool/DefaultSchedulerTest.java  |  2 +-
 6 files changed, 81 insertions(+), 44 deletions(-)

diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 1a534e1..9cb4932 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -98,7 +98,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
                 onExecuteFinished(result, executableContext);
             } catch (Exception e) {
                 logger.error(nRetry + "th retries for onExecuteFinished fails due to {}", e);
-                if (isMetaDataPersistException(e)) {
+                if (isMetaDataPersistException(e, 5)) {
                     exception = e;
                     try {
                         Thread.sleep(1000L * (long) Math.pow(4, nRetry));
@@ -204,14 +204,21 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         new MailService(context.getConfig()).sendMail(users, title, content);
     }
 
-    private boolean isMetaDataPersistException(Exception e) {
+    protected abstract ExecuteResult doWork(ExecutableContext context) throws ExecuteException, PersistentException;
+
+    @Override
+    public void cleanup() throws ExecuteException {
+
+    }
+
+    public static boolean isMetaDataPersistException(Exception e, final int maxDepth) {
         if (e instanceof PersistentException) {
             return true;
         }
 
         Throwable t = e.getCause();
         int depth = 0;
-        while (t != null && depth < 5) {
+        while (t != null && depth < maxDepth) {
             depth++;
             if (t instanceof PersistentException) {
                 return true;
@@ -221,13 +228,6 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         return false;
     }
 
-    protected abstract ExecuteResult doWork(ExecutableContext context) throws ExecuteException;
-
-    @Override
-    public void cleanup() throws ExecuteException {
-
-    }
-
     @Override
     public boolean isRunnable() {
         return this.getStatus() == ExecutableState.READY;
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 2297be7..8259025 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -111,9 +111,9 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
                             "There shouldn't be a running subtask[jobId: {}, jobName: {}], \n"
                                     + "it might cause endless state, will retry to fetch subtask's state.",
                             task.getId(), task.getName());
-                    boolean retryRet = retryFetchTaskStatus(task);
-                    if (false == retryRet)
-                        hasError = true;
+                    getManager().updateJobOutput(task.getId(), ExecutableState.ERROR, null,
+                            "killed due to inconsistent state");
+                    hasError = true;
                 }
 
                 final ExecutableState status = task.getStatus();
@@ -173,34 +173,6 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
         this.subTasks.add(executable);
     }
 
-    private boolean retryFetchTaskStatus(Executable task) {
-        boolean hasRunning = false;
-        int retry = 1;
-        while (retry <= 10) {
-            ExecutableState retryState = task.getStatus();
-            if (retryState == ExecutableState.RUNNING) {
-                try {
-                    Thread.sleep(100);
-                } catch (InterruptedException e) {
-                    logger.error("Failed to Sleep: ", e);
-                }
-                hasRunning = true;
-                logger.error("With {} times retry, it's state is still RUNNING", retry);
-            } else {
-                logger.info("With {} times retry, status is changed to: {}", retry, retryState);
-                hasRunning = false;
-                break;
-            }
-            retry++;
-        }
-        if (hasRunning) {
-            logger.error("Parent task: {} is finished, but it's subtask: {}'s state is still RUNNING \n"
-                    + ", mark parent task failed.", getName(), task.getName());
-            return false;
-        }
-        return true;
-    }
-    
     @Override
     public int getDefaultPriority() {
         return DEFAULT_PRIORITY;
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index abcb048..983c5ae 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -418,7 +418,6 @@ public class ExecutableManager {
     public void forceKillJob(String jobId) {
         try {
             final ExecutableOutputPO jobOutput = executableDao.getJobOutput(jobId);
-            jobOutput.setStatus(ExecutableState.ERROR.toString());
             List<ExecutablePO> tasks = executableDao.getJob(jobId).getTasks();
 
             for (ExecutablePO task : tasks) {
@@ -429,12 +428,31 @@ public class ExecutableManager {
                 }
                 break;
             }
-            executableDao.updateJobOutput(jobOutput);
+
+            if (!jobOutput.getStatus().equals(ExecutableState.ERROR.toString())) {
+                jobOutput.setStatus(ExecutableState.ERROR.toString());
+                executableDao.updateJobOutput(jobOutput);
+            }
         } catch (PersistentException e) {
             throw new RuntimeException(e);
         }
     }
 
+    public void forceKillJobWithRetry(String jobId) {
+        boolean done = false;
+
+        while (!done) {
+            try {
+                forceKillJob(jobId);
+                done = true;
+            } catch (RuntimeException e) {
+                if (!(e.getCause() instanceof PersistentException)) {
+                    done = true;
+                }
+            }
+        }
+    }
+
     //for migration only
     //TODO delete when migration finished
     public void resetJobOutput(String jobId, ExecutableState state, String output) {
@@ -451,6 +469,10 @@ public class ExecutableManager {
     }
 
     public void addJobInfo(String id, Map<String, String> info) {
+        if (Thread.currentThread().isInterrupted()) {
+            throw new RuntimeException("Current thread is interrupted, aborting");
+        }
+
         if (info == null) {
             return;
         }
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index c566408..6d40be8 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -114,6 +114,10 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
             } catch (ExecuteException e) {
                 logger.error("ExecuteException job:" + executable.getId(), e);
             } catch (Exception e) {
+                if (AbstractExecutable.isMetaDataPersistException(e, 5)) {
+                    // Job fail due to PersistException
+                    ExecutableManager.getInstance(jobEngineConfig.getConfig()).forceKillJobWithRetry(executable.getId());
+                }
                 logger.error("unknown error execute job:" + executable.getId(), e);
             } finally {
                 context.removeRunningJob(executable);
diff --git a/core-job/src/test/java/org/apache/kylin/job/PersistExceptionExecutable.java b/core-job/src/test/java/org/apache/kylin/job/PersistExceptionExecutable.java
new file mode 100644
index 0000000..78b393c
--- /dev/null
+++ b/core-job/src/test/java/org/apache/kylin/job/PersistExceptionExecutable.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import org.apache.kylin.job.exception.PersistentException;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+
+public class PersistExceptionExecutable extends BaseTestExecutable {
+    public PersistExceptionExecutable() {
+        super();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws PersistentException {
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+        }
+
+        throw new PersistentException("persistent exception");
+    }
+}
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
index 3c1a7ea..a1644a9 100644
--- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
@@ -151,7 +151,7 @@ public class DefaultSchedulerTest extends BaseSchedulerTest {
         waitForJobFinish(job.getId(), 10000);
         Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(job.getId()).getState());
         Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task1.getId()).getState());
-        Assert.assertEquals(ExecutableState.RUNNING, execMgr.getOutput(task2.getId()).getState());
+        Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(task2.getId()).getState());
     }
 
     @SuppressWarnings("rawtypes")


[kylin] 01/02: KYLIN-3645 clean table metadata when drop project

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch 2.4.x
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 1c0afe9ba13b7d79228e4cb3aba8a36074652b28
Author: Rongchuan Jin <ro...@RongchuanJins-MacBook-Pro.local>
AuthorDate: Wed Oct 24 10:56:21 2018 +0800

    KYLIN-3645 clean table metadata when drop project
---
 .../java/org/apache/kylin/rest/service/ProjectService.java    | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)

diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/ProjectService.java b/server-base/src/main/java/org/apache/kylin/rest/service/ProjectService.java
index 7d56fff..22ee95e 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/ProjectService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/ProjectService.java
@@ -23,9 +23,8 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
-
+import java.util.Set;
 import javax.annotation.Nullable;
-
 import org.apache.directory.api.util.Strings;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.realization.RealizationType;
@@ -68,6 +67,9 @@ public class ProjectService extends BasicService {
     @Autowired
     private AclEvaluate aclEvaluate;
 
+    @Autowired
+    private TableService tableService;
+
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
     public ProjectInstance createProject(ProjectInstance newProject) throws IOException {
         Message msg = MsgPicker.getMsg();
@@ -131,8 +133,11 @@ public class ProjectService extends BasicService {
 
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
     public void deleteProject(String projectName, ProjectInstance project) throws IOException {
+        Set<String> tables = project.getTables();
+        for (String table : tables) {
+            tableService.unloadHiveTable(table, projectName);
+        }
         getProjectManager().dropProject(projectName);
-
         accessService.clean(project, true);
     }