You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/10/04 08:31:59 UTC
[14/30] kylin git commit: Revert "Revert "KYLIN-1726 allow job
discard itself""
Revert "Revert "KYLIN-1726 allow job discard itself""
This reverts commit 3ae2549ba89e3a2c8ed94a2089678227cf78312d.
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f0de0239
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f0de0239
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f0de0239
Branch: refs/heads/master-hbase1.x
Commit: f0de02392f4808e08432417cb00f2ab5fa829055
Parents: de2f4e2
Author: shaofengshi <sh...@apache.org>
Authored: Sat Sep 24 14:57:36 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Sep 27 10:17:40 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/cube/CubeManager.java | 14 ++-----
.../kylin/job/execution/AbstractExecutable.java | 2 +
.../job/execution/DefaultChainedExecutable.java | 2 +
.../kylin/job/execution/ExecuteResult.java | 4 ++
.../kylin/job/DiscardedTestExecutable.java | 41 ++++++++++++++++++++
.../impl/threadpool/DefaultSchedulerTest.java | 16 ++++++++
6 files changed, 68 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/f0de0239/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 5a4b07c..3a327f9 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -357,34 +357,26 @@ public class CubeManager implements IRealizationProvider {
Iterator<CubeSegment> iterator = newSegs.iterator();
while (iterator.hasNext()) {
CubeSegment currentSeg = iterator.next();
- boolean found = false;
for (CubeSegment toRemoveSeg : update.getToRemoveSegs()) {
if (currentSeg.getUuid().equals(toRemoveSeg.getUuid())) {
+ logger.info("Remove segment " + currentSeg.toString());
+ toRemoveResources.add(currentSeg.getStatisticsResourcePath());
iterator.remove();
- toRemoveResources.add(toRemoveSeg.getStatisticsResourcePath());
- found = true;
+ break;
}
}
- if (found == false) {
- logger.error("Segment '" + currentSeg.getName() + "' doesn't exist for remove.");
- }
}
}
if (update.getToUpdateSegs() != null) {
for (CubeSegment segment : update.getToUpdateSegs()) {
- boolean found = false;
for (int i = 0; i < newSegs.size(); i++) {
if (newSegs.get(i).getUuid().equals(segment.getUuid())) {
newSegs.set(i, segment);
- found = true;
break;
}
}
- if (found == false) {
- logger.error("Segment '" + segment.getName() + "' doesn't exist for update.");
- }
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/f0de0239/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 90e4d3c..b4ca469 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -74,6 +74,8 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
if (!isDiscarded()) {
if (result.succeed()) {
executableManager.updateJobOutput(getId(), ExecutableState.SUCCEED, null, result.output());
+ } else if (result.discarded()) {
+ executableManager.updateJobOutput(getId(), ExecutableState.DISCARDED, null, result.output());
} else {
executableManager.updateJobOutput(getId(), ExecutableState.ERROR, null, result.output());
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/f0de0239/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 39a5f4f..5a57b05 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -119,6 +119,8 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
} else {
jobService.updateJobOutput(getId(), ExecutableState.READY, null, null);
}
+ } else if (result.discarded()) {
+ jobService.updateJobOutput(getId(), ExecutableState.DISCARDED, null, result.output());
} else {
setEndTime(System.currentTimeMillis());
jobService.updateJobOutput(getId(), ExecutableState.ERROR, null, result.output());
http://git-wip-us.apache.org/repos/asf/kylin/blob/f0de0239/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
index 760a574..2347e7d 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecuteResult.java
@@ -49,6 +49,10 @@ public final class ExecuteResult {
return state == State.SUCCEED;
}
+ public boolean discarded() {
+ return state == State.DISCARDED;
+ }
+
public String output() {
return output;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/f0de0239/core-job/src/test/java/org/apache/kylin/job/DiscardedTestExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/DiscardedTestExecutable.java b/core-job/src/test/java/org/apache/kylin/job/DiscardedTestExecutable.java
new file mode 100644
index 0000000..9362e18
--- /dev/null
+++ b/core-job/src/test/java/org/apache/kylin/job/DiscardedTestExecutable.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+
+/**
+ */
+public class DiscardedTestExecutable extends BaseTestExecutable {
+
+ public DiscardedTestExecutable() {
+ super();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ return new ExecuteResult(ExecuteResult.State.DISCARDED, "discarded");
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/f0de0239/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
index df521f9..2baf10a 100644
--- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import org.apache.kylin.job.DiscardedTestExecutable;
import org.apache.kylin.job.BaseTestExecutable;
import org.apache.kylin.job.ErrorTestExecutable;
import org.apache.kylin.job.FailedTestExecutable;
@@ -83,6 +84,21 @@ public class DefaultSchedulerTest extends BaseSchedulerTest {
}
@Test
+ public void testSucceedAndDiscarded() throws Exception {
+ DefaultChainedExecutable job = new DefaultChainedExecutable();
+ BaseTestExecutable task1 = new SucceedTestExecutable();
+ BaseTestExecutable task2 = new DiscardedTestExecutable();
+ job.addTask(task1);
+ job.addTask(task2);
+ jobService.addJob(job);
+ waitForJobFinish(job.getId());
+ Assert.assertEquals(ExecutableState.DISCARDED, jobService.getOutput(job.getId()).getState());
+ Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+ Assert.assertEquals(ExecutableState.DISCARDED, jobService.getOutput(task2.getId()).getState());
+ }
+
+
+ @Test
public void testSucceedAndError() throws Exception {
DefaultChainedExecutable job = new DefaultChainedExecutable();
BaseTestExecutable task1 = new ErrorTestExecutable();