You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/07/29 10:34:44 UTC

[1/2] incubator-kylin git commit: KYLIN-911

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8 b45ed11fa -> 584b02b3d


KYLIN-911


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/584b02b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/584b02b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/584b02b3

Branch: refs/heads/0.8
Commit: 584b02b3d787aff6855fa050609c955cc5d7e048
Parents: 6c4613b
Author: qianhao.zhou <qi...@ebay.com>
Authored: Tue Jul 28 17:23:21 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Wed Jul 29 16:35:33 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/engine/mr/CubingJob.java   | 28 +++++++------
 .../apache/kylin/rest/service/JobService.java   | 43 +++++++++-----------
 2 files changed, 35 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/584b02b3/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index 7251730..fd65209 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -18,12 +18,7 @@
 
 package org.apache.kylin.engine.mr;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.TimeZone;
-
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -31,12 +26,15 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.job.execution.Output;
+import org.apache.kylin.job.execution.*;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.TimeZone;
+
 
 /**
  */
@@ -83,11 +81,15 @@ public class CubingJob extends DefaultChainedExecutable {
         return getParam(CUBE_INSTANCE_NAME);
     }
 
+    void setSegmentIds(List<String> segmentIds) {
+        setParam(SEGMENT_ID, StringUtils.join(segmentIds, ","));
+    }
+    
     void setSegmentId(String segmentId) {
         setParam(SEGMENT_ID, segmentId);
     }
 
-    public String getSegmentId() {
+    public String getSegmentIds() {
         return getParam(SEGMENT_ID);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/584b02b3/server/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/JobService.java b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
index bb17bd2..baf7edb 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -18,16 +18,15 @@
 
 package org.apache.kylin.rest.service;
 
-import java.io.IOException;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.kylin.cube.CubeUpdate;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
 import org.apache.kylin.cube.model.CubeBuildTypeEnum;
 import org.apache.kylin.engine.BuildEngineFactory;
 import org.apache.kylin.engine.mr.CubingJob;
@@ -50,11 +49,8 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.security.access.prepost.PreAuthorize;
 import org.springframework.stereotype.Component;
 
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.*;
 
 /**
  * @author ysong1
@@ -181,7 +177,7 @@ public class JobService extends BasicService {
         final JobInstance result = new JobInstance();
         result.setName(job.getName());
         result.setRelatedCube(cubeJob.getCubeName());
-        result.setRelatedSegment(cubeJob.getSegmentId());
+        result.setRelatedSegment(cubeJob.getSegmentIds());
         result.setLastModified(cubeJob.getLastModified());
         result.setSubmitter(cubeJob.getSubmitter());
         result.setUuid(cubeJob.getId());
@@ -206,7 +202,7 @@ public class JobService extends BasicService {
         final JobInstance result = new JobInstance();
         result.setName(job.getName());
         result.setRelatedCube(cubeJob.getCubeName());
-        result.setRelatedSegment(cubeJob.getSegmentId());
+        result.setRelatedSegment(cubeJob.getSegmentIds());
         result.setLastModified(output.getLastModified());
         result.setSubmitter(cubeJob.getSubmitter());
         result.setUuid(cubeJob.getId());
@@ -295,16 +291,17 @@ public class JobService extends BasicService {
         //        for (BuildCubeJob cubeJob: listAllCubingJobs(cube.getName(), null, EnumSet.of(ExecutableState.READY, ExecutableState.RUNNING))) {
         //            getExecutableManager().stopJob(cubeJob.getId());
         //        }
-        final String segmentId = job.getRelatedSegment();
         CubeInstance cubeInstance = getCubeManager().getCube(job.getRelatedCube());
-        final CubeSegment segment = cubeInstance.getSegmentById(segmentId);
-        if (segment != null && segment.getStatus() == SegmentStatusEnum.NEW) {
-            // Remove this segments
-            CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
-            cubeBuilder.setToRemoveSegs(segment);
-            getCubeManager().updateCube(cubeBuilder);
+        final String segmentIds = job.getRelatedSegment();
+        for (String segmentId: StringUtils.split(segmentIds)) {
+            final CubeSegment segment = cubeInstance.getSegmentById(segmentId);
+            if (segment != null && segment.getStatus() == SegmentStatusEnum.NEW) {
+                // Remove this segments
+                CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
+                cubeBuilder.setToRemoveSegs(segment);
+                getCubeManager().updateCube(cubeBuilder);
+            }
         }
-
         getExecutableManager().discardJob(job.getId());
         return job;
     }


[2/2] incubator-kylin git commit: rename ITDefaultSchedulerTest to DefaultSchedulerTest, categorize it into unit test

Posted by qh...@apache.org.
rename ITDefaultSchedulerTest to DefaultSchedulerTest, categorize it into unit test


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/6c4613bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/6c4613bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/6c4613bf

Branch: refs/heads/0.8
Commit: 6c4613bf30cc1d1b29d3b28c61b4d124de26bf29
Parents: b45ed11
Author: qianhao.zhou <qi...@ebay.com>
Authored: Wed Jul 29 16:34:52 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Wed Jul 29 16:35:33 2015 +0800

----------------------------------------------------------------------
 .../job/impl/threadpool/BaseSchedulerTest.java  |   8 +-
 .../impl/threadpool/DefaultSchedulerTest.java   | 100 +++++++++++++++++++
 .../impl/threadpool/ITDefaultSchedulerTest.java | 100 -------------------
 3 files changed, 104 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c4613bf/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java b/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
index 1a503e1..22aca5c 100644
--- a/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/impl/threadpool/BaseSchedulerTest.java
@@ -19,14 +19,14 @@
 package org.apache.kylin.job.impl.threadpool;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.job.DeployUtil;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.lock.MockJobLock;
 import org.apache.kylin.job.manager.ExecutableManager;
-import org.apache.kylin.storage.hbase.HBaseMetadataTestCase;
-import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -36,7 +36,7 @@ import java.lang.reflect.Modifier;
 
 /**
  */
-public abstract class BaseSchedulerTest extends HBaseMetadataTestCase {
+public abstract class BaseSchedulerTest extends LocalFileMetadataTestCase {
 
     private DefaultScheduler scheduler;
 
@@ -95,7 +95,7 @@ public abstract class BaseSchedulerTest extends HBaseMetadataTestCase {
         setFinalStatic(ExecutableConstants.class.getField("DEFAULT_SCHEDULER_INTERVAL_SECONDS"), 10);
         jobService = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
         scheduler = DefaultScheduler.getInstance();
-        scheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv()), new ZookeeperJobLock());
+        scheduler.init(new JobEngineConfig(KylinConfig.getInstanceFromEnv()), new MockJobLock());
         if (!scheduler.hasStarted()) {
             throw new RuntimeException("scheduler has not been started");
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c4613bf/job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java b/job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
new file mode 100644
index 0000000..debe9a4
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.impl.threadpool;
+
+import org.apache.kylin.job.*;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.Output;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ */
+public class DefaultSchedulerTest extends BaseSchedulerTest {
+
+    @Test
+    public void testSingleTaskJob() throws Exception {
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        BaseTestExecutable task1 = new SucceedTestExecutable();
+        job.addTask(task1);
+        jobService.addJob(job);
+        waitForJobFinish(job.getId());
+        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+    }
+
+    @Test
+    public void testSucceed() throws Exception {
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        BaseTestExecutable task1 = new SucceedTestExecutable();
+        BaseTestExecutable task2 = new SucceedTestExecutable();
+        job.addTask(task1);
+        job.addTask(task2);
+        jobService.addJob(job);
+        waitForJobFinish(job.getId());
+        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState());
+    }
+    @Test
+    public void testSucceedAndFailed() throws Exception {
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        BaseTestExecutable task1 = new SucceedTestExecutable();
+        BaseTestExecutable task2 = new FailedTestExecutable();
+        job.addTask(task1);
+        job.addTask(task2);
+        jobService.addJob(job);
+        waitForJobFinish(job.getId());
+        assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
+        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+        assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState());
+    }
+    @Test
+    public void testSucceedAndError() throws Exception {
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        BaseTestExecutable task1 = new ErrorTestExecutable();
+        BaseTestExecutable task2 = new SucceedTestExecutable();
+        job.addTask(task1);
+        job.addTask(task2);
+        jobService.addJob(job);
+        waitForJobFinish(job.getId());
+        assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
+        assertEquals(ExecutableState.ERROR, jobService.getOutput(task1.getId()).getState());
+        assertEquals(ExecutableState.READY, jobService.getOutput(task2.getId()).getState());
+    }
+
+    @Test
+    public void testDiscard() throws Exception {
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        BaseTestExecutable task1 = new SelfStopExecutable();
+        job.addTask(task1);
+        jobService.addJob(job);
+        waitForJobStatus(job.getId(), ExecutableState.RUNNING, 500);
+        jobService.discardJob(job.getId());
+        waitForJobFinish(job.getId());
+        assertEquals(ExecutableState.DISCARDED, jobService.getOutput(job.getId()).getState());
+        assertEquals(ExecutableState.DISCARDED, jobService.getOutput(task1.getId()).getState());
+        Thread.sleep(5000);
+        System.out.println(job);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6c4613bf/job/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java b/job/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
deleted file mode 100644
index 7c2218b..0000000
--- a/job/src/test/java/org/apache/kylin/job/impl/threadpool/ITDefaultSchedulerTest.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.impl.threadpool;
-
-import org.apache.kylin.job.*;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.execution.Output;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- */
-public class ITDefaultSchedulerTest extends BaseSchedulerTest {
-
-    @Test
-    public void testSingleTaskJob() throws Exception {
-        DefaultChainedExecutable job = new DefaultChainedExecutable();
-        BaseTestExecutable task1 = new SucceedTestExecutable();
-        job.addTask(task1);
-        jobService.addJob(job);
-        waitForJobFinish(job.getId());
-        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
-        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
-    }
-
-    @Test
-    public void testSucceed() throws Exception {
-        DefaultChainedExecutable job = new DefaultChainedExecutable();
-        BaseTestExecutable task1 = new SucceedTestExecutable();
-        BaseTestExecutable task2 = new SucceedTestExecutable();
-        job.addTask(task1);
-        job.addTask(task2);
-        jobService.addJob(job);
-        waitForJobFinish(job.getId());
-        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
-        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
-        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState());
-    }
-    @Test
-    public void testSucceedAndFailed() throws Exception {
-        DefaultChainedExecutable job = new DefaultChainedExecutable();
-        BaseTestExecutable task1 = new SucceedTestExecutable();
-        BaseTestExecutable task2 = new FailedTestExecutable();
-        job.addTask(task1);
-        job.addTask(task2);
-        jobService.addJob(job);
-        waitForJobFinish(job.getId());
-        assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
-        assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
-        assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState());
-    }
-    @Test
-    public void testSucceedAndError() throws Exception {
-        DefaultChainedExecutable job = new DefaultChainedExecutable();
-        BaseTestExecutable task1 = new ErrorTestExecutable();
-        BaseTestExecutable task2 = new SucceedTestExecutable();
-        job.addTask(task1);
-        job.addTask(task2);
-        jobService.addJob(job);
-        waitForJobFinish(job.getId());
-        assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState());
-        assertEquals(ExecutableState.ERROR, jobService.getOutput(task1.getId()).getState());
-        assertEquals(ExecutableState.READY, jobService.getOutput(task2.getId()).getState());
-    }
-
-    @Test
-    public void testDiscard() throws Exception {
-        DefaultChainedExecutable job = new DefaultChainedExecutable();
-        BaseTestExecutable task1 = new SelfStopExecutable();
-        job.addTask(task1);
-        jobService.addJob(job);
-        waitForJobStatus(job.getId(), ExecutableState.RUNNING, 500);
-        jobService.discardJob(job.getId());
-        waitForJobFinish(job.getId());
-        assertEquals(ExecutableState.DISCARDED, jobService.getOutput(job.getId()).getState());
-        assertEquals(ExecutableState.DISCARDED, jobService.getOutput(task1.getId()).getState());
-        Thread.sleep(5000);
-        System.out.println(job);
-    }
-
-}