You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/12/24 11:39:59 UTC
[kylin] branch master updated: KYLIN-4249:DistributedScheduler can
selectively assign task nodes according to cube extra configuration
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new b72781c KYLIN-4249:DistributedScheduler can selectively assign task nodes according to cube extra configuration
b72781c is described below
commit b72781c3d89e3c5fd5bc1f9363d1dc4839e3c3d2
Author: zhangxiang17 <zh...@58.com>
AuthorDate: Mon Nov 11 13:02:30 2019 +0800
KYLIN-4249:DistributedScheduler can selectively assign task nodes according to cube extra configuration
---
.../org/apache/kylin/common/KylinConfigBase.java | 32 ++++++++++++++++-
.../org/apache/kylin/common}/util/ToolUtil.java | 38 +++++++++++++++++++-
.../org/apache/kylin/common/util/ToolUtilTest.java | 41 ++++++++++++++++++++++
.../job/impl/threadpool/DistributedScheduler.java | 9 ++++-
.../apache/kylin/tool/AbstractInfoExtractor.java | 2 +-
.../org/apache/kylin/tool/DiagnosisInfoCLI.java | 2 +-
.../org/apache/kylin/tool/JobDiagnosisInfoCLI.java | 2 +-
.../org/apache/kylin/tool/KylinLogExtractor.java | 2 +-
8 files changed, 121 insertions(+), 7 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 91ac9ff..73b8f01 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -731,6 +731,36 @@ public abstract class KylinConfigBase implements Serializable {
return Integer.parseInt(getOptional("kylin.cube.cubeplanner.algorithm-threshold-genetic", "23"));
}
+ /**
+ * get assigned server array, which a empty string array in default
+ * @return
+ */
+ public String[] getAssignedServers() {
+ return getOptionalStringArray("kylin.cube.schedule.assigned-servers", new String[] {});
+ }
+
+ /**
+ * Determine if the target node is in the assigned node
+ * @param targetServers target task servers
+ * @return
+ */
+ public boolean isOnAssignedServer(String... targetServers) {
+
+ String[] servers = this.getAssignedServers();
+ if (null == servers || servers.length == 0) {
+ return true;
+ }
+
+ for (String s : servers) {
+ for (String ts : targetServers) {
+ if (s.equalsIgnoreCase(ts)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
// ============================================================================
// JOB
// ============================================================================
@@ -1526,7 +1556,7 @@ public abstract class KylinConfigBase implements Serializable {
return Boolean.parseBoolean(getOptional("kylin.engine.spark-fact-distinct", "false"));
}
- public boolean isSparkCardinalityEnabled(){
+ public boolean isSparkCardinalityEnabled() {
return Boolean.parseBoolean(getOptional("kylin.engine.spark-cardinality", "false"));
}
diff --git a/tool/src/main/java/org/apache/kylin/tool/util/ToolUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/ToolUtil.java
similarity index 71%
rename from tool/src/main/java/org/apache/kylin/tool/util/ToolUtil.java
rename to core-common/src/main/java/org/apache/kylin/common/util/ToolUtil.java
index 842beb2..35075a6 100644
--- a/tool/src/main/java/org/apache/kylin/tool/util/ToolUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ToolUtil.java
@@ -17,7 +17,7 @@
*
*/
-package org.apache.kylin.tool.util;
+package org.apache.kylin.common.util;
import com.google.common.collect.Maps;
import org.apache.commons.lang.StringUtils;
@@ -26,8 +26,13 @@ import org.apache.kylin.common.persistence.ResourceStore;
import java.io.File;
import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.Inet6Address;
import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
import java.net.UnknownHostException;
+import java.util.Enumeration;
import java.util.Map;
public class ToolUtil {
@@ -90,4 +95,35 @@ public class ToolUtil {
}
return hostname;
}
+
+ public static InetAddress getFirstNonLoopbackAddress(boolean preferIpv4, boolean preferIPv6)
+ throws SocketException {
+ Enumeration en = NetworkInterface.getNetworkInterfaces();
+ while (en.hasMoreElements()) {
+ NetworkInterface element = (NetworkInterface) en.nextElement();
+ for (Enumeration en2 = element.getInetAddresses(); en2.hasMoreElements();) {
+ InetAddress addr = (InetAddress) en2.nextElement();
+ if (!addr.isLoopbackAddress()) {
+ if (addr instanceof Inet4Address) {
+ if (preferIPv6) {
+ continue;
+ }
+ return addr;
+ }
+ if (addr instanceof Inet6Address) {
+ if (preferIpv4) {
+ continue;
+ }
+ return addr;
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ public static InetAddress getFirstIPV4NonLoopBackAddress() throws SocketException {
+ return getFirstNonLoopbackAddress(true, false);
+ }
+
}
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/ToolUtilTest.java b/core-common/src/test/java/org/apache/kylin/common/util/ToolUtilTest.java
new file mode 100644
index 0000000..8161d43
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/util/ToolUtilTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.kylin.common.util;
+
+import java.net.Inet4Address;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ToolUtilTest {
+
+ @Test
+ public void testGetFirstIPV4NonLoopBackAddress() throws Exception {
+
+ InetAddress localIp = ToolUtil.getFirstIPV4NonLoopBackAddress();
+ boolean isLoopBackIp = localIp.isLoopbackAddress();
+ Assert.assertNotNull("localIp is null", localIp);
+ Assert.assertEquals(false, isLoopBackIp);
+ Assert.assertEquals(true, localIp instanceof Inet4Address);
+ Assert.assertEquals(false, localIp instanceof Inet6Address);
+ }
+}
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index 8ea1533..4df9221 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -34,6 +34,7 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.lock.DistributedLock;
import org.apache.kylin.common.util.SetThreadName;
import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.common.util.ToolUtil;
import org.apache.kylin.job.Scheduler;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.exception.ExecuteException;
@@ -102,7 +103,13 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable> {
public void run() {
try (SetThreadName ignored = new SetThreadName("Scheduler %s Job %s",
System.identityHashCode(DistributedScheduler.this), executable.getId())) {
- if (jobLock.lock(getLockPath(executable.getId()))) {
+
+ KylinConfig config = executable.getCubeSpecificConfig();
+ boolean isAssigned = config.isOnAssignedServer(ToolUtil.getHostName(),
+ ToolUtil.getFirstIPV4NonLoopBackAddress().getHostAddress());
+ logger.debug("cube = " + executable.getCubeName() + "; jobId=" + executable.getId()
+ + (isAssigned ? " is " : " is not ") + "assigned on this server : " + ToolUtil.getHostName());
+ if (isAssigned && jobLock.lock(getLockPath(executable.getId()))) {
logger.info(executable.toString() + " scheduled in server: " + serverName);
context.addRunningJob(executable);
diff --git a/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java b/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java
index 994e4d6..f55e11d 100644
--- a/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java
+++ b/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java
@@ -36,7 +36,7 @@ import org.apache.kylin.common.KylinVersion;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.common.util.ZipFileUtils;
-import org.apache.kylin.tool.util.ToolUtil;
+import org.apache.kylin.common.util.ToolUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java b/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java
index 9063e9e..5d85308 100644
--- a/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java
@@ -36,7 +36,7 @@ import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.project.ProjectManager;
-import org.apache.kylin.tool.util.ToolUtil;
+import org.apache.kylin.common.util.ToolUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java b/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java
index 8fec48e..e71c5f6 100644
--- a/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java
@@ -36,7 +36,7 @@ import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.dao.ExecutableDao;
import org.apache.kylin.job.dao.ExecutablePO;
-import org.apache.kylin.tool.util.ToolUtil;
+import org.apache.kylin.common.util.ToolUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/tool/src/main/java/org/apache/kylin/tool/KylinLogExtractor.java b/tool/src/main/java/org/apache/kylin/tool/KylinLogExtractor.java
index a84345b..cb9fcd0 100644
--- a/tool/src/main/java/org/apache/kylin/tool/KylinLogExtractor.java
+++ b/tool/src/main/java/org/apache/kylin/tool/KylinLogExtractor.java
@@ -32,7 +32,7 @@ import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.metadata.model.DataModelManager;
import org.apache.kylin.metadata.project.ProjectManager;
-import org.apache.kylin.tool.util.ToolUtil;
+import org.apache.kylin.common.util.ToolUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;