You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/01/19 09:02:22 UTC
[03/11] kylin git commit: KYLIN-2389 Improve resource utilization for
DistributedScheduler
KYLIN-2389 Improve resource utilization for DistributedScheduler
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/837bd820
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/837bd820
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/837bd820
Branch: refs/heads/master-hbase1.x
Commit: 837bd8200b250f38fcfb2d221764d5aca0c66403
Parents: e894465
Author: kangkaisen <ka...@163.com>
Authored: Fri Jan 13 19:58:41 2017 +0800
Committer: kangkaisen <ka...@163.com>
Committed: Wed Jan 18 16:14:24 2017 +0800
----------------------------------------------------------------------
.../impl/threadpool/DistributedScheduler.java | 8 +--
.../kylin/job/lock/DistributedJobLock.java | 2 +
.../apache/kylin/rest/service/JobService.java | 45 --------------
.../hbase/util/ZookeeperDistributedJobLock.java | 63 ++++++++++++++++----
4 files changed, 58 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/837bd820/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index 3436529..84e62d5 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -195,13 +195,13 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
}
}
- //release job lock only when the all tasks of the job finish and the job server keep the cube lock.
+ //release job lock when job state is ready or running and the job server keep the cube lock.
private void releaseJobLock(AbstractExecutable executable) {
if (executable instanceof DefaultChainedExecutable) {
String segmentId = executable.getParam(SEGMENT_ID);
ExecutableState state = executable.getStatus();
- if (state == ExecutableState.SUCCEED || state == ExecutableState.ERROR || state == ExecutableState.DISCARDED) {
+ if (state != ExecutableState.READY && state != ExecutableState.RUNNING) {
if (segmentWithLocks.contains(segmentId)) {
logger.info(executable.toString() + " will release the lock for the segment: " + segmentId);
jobLock.unlockWithName(segmentId);
@@ -232,7 +232,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
if (executable instanceof DefaultChainedExecutable && executable.getParams().get(SEGMENT_ID).equalsIgnoreCase(segmentId) && !nodeData.equalsIgnoreCase(serverName)) {
try {
logger.warn(nodeData + " has released the lock for: " + segmentId + " but the job still running. so " + serverName + " resume the job");
- if (jobLock.lockWithName(segmentId, serverName)) {
+ if (!jobLock.isHasLocked(segmentId)) {
executableManager.resumeRunningJobForce(executable.getId());
fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
break;
@@ -302,7 +302,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
AbstractExecutable executable = executableManager.getJob(id);
if (output.getState() == ExecutableState.RUNNING && executable instanceof DefaultChainedExecutable) {
try {
- if (jobLock.lockWithName(executable.getParam(SEGMENT_ID), serverName)) {
+ if (!jobLock.isHasLocked(executable.getParam(SEGMENT_ID))) {
executableManager.resumeRunningJobForce(executable.getId());
fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/837bd820/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
index 9335e56..1c173ec 100644
--- a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
@@ -24,6 +24,8 @@ public interface DistributedJobLock extends JobLock {
boolean lockWithName(String name, String serverName);
+ boolean isHasLocked(String segmentId);
+
void unlockWithName(String name);
void watchLock(ExecutorService pool, DoWatchLock doWatch);
http://git-wip-us.apache.org/repos/asf/kylin/blob/837bd820/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 4709a91..ed24a9d 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -19,8 +19,6 @@
package org.apache.kylin.rest.service;
import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
@@ -56,7 +54,6 @@ import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.job.lock.DistributedJobLock;
import org.apache.kylin.job.lock.JobLock;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
@@ -282,15 +279,12 @@ public class JobService extends BasicService implements InitializingBean {
SourcePartition sourcePartition = new SourcePartition(startDate, endDate, startOffset, endOffset, sourcePartitionOffsetStart, sourcePartitionOffsetEnd);
sourcePartition = source.parsePartitionBeforeBuild(cube, sourcePartition);
newSeg = getCubeManager().appendSegment(cube, sourcePartition);
- lockSegment(newSeg.getUuid());
job = EngineFactory.createBatchCubingJob(newSeg, submitter);
} else if (buildType == CubeBuildTypeEnum.MERGE) {
newSeg = getCubeManager().mergeSegments(cube, startDate, endDate, startOffset, endOffset, force);
- lockSegment(newSeg.getUuid());
job = EngineFactory.createBatchMergeJob(newSeg, submitter);
} else if (buildType == CubeBuildTypeEnum.REFRESH) {
newSeg = getCubeManager().refreshSegment(cube, startDate, endDate, startOffset, endOffset);
- lockSegment(newSeg.getUuid());
job = EngineFactory.createBatchCubingJob(newSeg, submitter);
} else {
throw new JobException("invalid build type:" + buildType);
@@ -312,7 +306,6 @@ public class JobService extends BasicService implements InitializingBean {
}
}
throw e;
-
}
JobInstance jobInstance = getSingleJobInstance(job);
@@ -454,15 +447,11 @@ public class JobService extends BasicService implements InitializingBean {
@PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')")
public void resumeJob(JobInstance job) throws IOException, JobException {
- lockSegment(job.getRelatedSegment());
-
getExecutableManager().resumeJob(job.getId());
}
@PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')")
public void rollbackJob(JobInstance job, String stepId) throws IOException, JobException {
- lockSegment(job.getRelatedSegment());
-
getExecutableManager().rollbackJob(job.getId(), stepId);
}
@@ -486,47 +475,15 @@ public class JobService extends BasicService implements InitializingBean {
}
getExecutableManager().discardJob(job.getId());
- //release the segment lock when discarded the job but the job hasn't scheduled
- releaseSegmentLock(job.getRelatedSegment());
-
return job;
}
-
@PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')")
public JobInstance pauseJob(JobInstance job) throws IOException, JobException {
getExecutableManager().pauseJob(job.getId());
-
- //release the segment lock when discarded the job but the job hasn't scheduled
- releaseSegmentLock(job.getRelatedSegment());
-
return job;
}
- private void lockSegment(String segmentId) throws JobException {
- if (jobLock instanceof DistributedJobLock) {
- if (!((DistributedJobLock) jobLock).lockWithName(segmentId, getServerName())) {
- throw new JobException("Fail to get the segment lock, the segment may be building in another job server");
- }
- }
- }
-
- private void releaseSegmentLock(String segmentId) {
- if (jobLock instanceof DistributedJobLock) {
- ((DistributedJobLock) jobLock).unlockWithName(segmentId);
- }
- }
-
- private String getServerName() {
- String serverName = null;
- try {
- serverName = InetAddress.getLocalHost().getHostName();
- } catch (UnknownHostException e) {
- logger.error("fail to get the hostname");
- }
- return serverName;
- }
-
public List<CubingJob> listAllCubingJobs(final String cubeName, final String projectName, final Set<ExecutableState> statusList, final Map<String, Output> allOutputs) {
return listAllCubingJobs(cubeName, projectName, statusList, -1L, -1L, allOutputs);
}
@@ -584,6 +541,4 @@ public class JobService extends BasicService implements InitializingBean {
public List<CubingJob> listAllCubingJobs(final String cubeName, final String projectName) {
return listAllCubingJobs(cubeName, projectName, EnumSet.allOf(ExecutableState.class), getExecutableManager().getAllOutputs());
}
-
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/837bd820/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
index 5a44cc1..ee7cd50 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
@@ -96,6 +96,8 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
* @param serverName the hostname of job server
*
* @return <tt>true</tt> if the segment locked successfully
+ *
+ * @since 2.0
*/
@Override
@@ -110,13 +112,13 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
return false;
}
if (zkClient.checkExists().forPath(lockPath) != null) {
- if (hasLock(serverName, lockPath)) {
+ if (isKeepLock(serverName, lockPath)) {
hasLock = true;
logger.info(serverName + " has kept the lock for segment: " + segmentId);
}
} else {
zkClient.create().withMode(CreateMode.EPHEMERAL).forPath(lockPath, serverName.getBytes(Charset.forName("UTF-8")));
- if (hasLock(serverName, lockPath)) {
+ if (isKeepLock(serverName, lockPath)) {
hasLock = true;
logger.info(serverName + " lock the segment: " + segmentId + " successfully");
}
@@ -131,19 +133,54 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
return true;
}
- private boolean hasLock(String serverName, String lockPath) {
- String lockServerName = null;
+ /**
+ *
+ * Returns <tt>true</tt> if, the job server is keeping the lock for the lockPath
+ *
+ * @param serverName the hostname of job server
+ *
+ * @param lockPath the zookeeper node path of segment
+ *
+ * @return <tt>true</tt> if the job server is keeping the lock for the lockPath, otherwise
+ * <tt>false</tt>
+ *
+ * @since 2.0
+ */
+
+ private boolean isKeepLock(String serverName, String lockPath) {
try {
if (zkClient.checkExists().forPath(lockPath) != null) {
byte[] data = zkClient.getData().forPath(lockPath);
- lockServerName = new String(data, Charset.forName("UTF-8"));
+ String lockServerName = new String(data, Charset.forName("UTF-8"));
+ return lockServerName.equalsIgnoreCase(serverName);
}
} catch (Exception e) {
logger.error("fail to get the serverName for the path: " + lockPath, e);
}
- if(lockServerName == null)
- return false;
- return lockServerName.equalsIgnoreCase(serverName);
+ return false;
+ }
+
+ /**
+ *
+ * Returns <tt>true</tt> if, and only if, the segment has been locked.
+ *
+ * @param segmentId the id of segment need to release the lock.
+ *
+ * @return <tt>true</tt> if the segment has been locked, otherwise
+ * <tt>false</tt>
+ *
+ * @since 2.0
+ */
+
+ @Override
+ public boolean isHasLocked(String segmentId) {
+ String lockPath = getLockPath(segmentId);
+ try {
+ return zkClient.checkExists().forPath(lockPath) != null;
+ } catch (Exception e) {
+ logger.error("fail to get the path: " + lockPath, e);
+ }
+ return false;
}
/**
@@ -151,7 +188,9 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
*
* <p> the segment related zookeeper node will be deleted.
*
- * @param segmentId the name of segment need to release the lock
+ * @param segmentId the id of segment need to release the lock
+ *
+ * @since 2.0
*/
@Override
@@ -177,7 +216,10 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
* in order to when one job server is down, other job server can take over the running jobs.
*
* @param pool the threadPool watching the zookeeper node change
+ *
* @param doWatch do the concrete action with the zookeeper node path and zookeeper node data
+ *
+ * @since 2.0
*/
@Override
@@ -229,9 +271,8 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
@Override
public void unlock() {
-
}
-
+
public void close() {
try {
childrenCache.close();