You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/12/24 11:39:59 UTC

[kylin] branch master updated: KYLIN-4249:DistributedScheduler can selectively assign task nodes according to cube extra configuration

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new b72781c  KYLIN-4249:DistributedScheduler can selectively assign task nodes according to cube extra configuration
b72781c is described below

commit b72781c3d89e3c5fd5bc1f9363d1dc4839e3c3d2
Author: zhangxiang17 <zh...@58.com>
AuthorDate: Mon Nov 11 13:02:30 2019 +0800

    KYLIN-4249:DistributedScheduler can selectively assign task nodes according to cube extra configuration
---
 .../org/apache/kylin/common/KylinConfigBase.java   | 32 ++++++++++++++++-
 .../org/apache/kylin/common}/util/ToolUtil.java    | 38 +++++++++++++++++++-
 .../org/apache/kylin/common/util/ToolUtilTest.java | 41 ++++++++++++++++++++++
 .../job/impl/threadpool/DistributedScheduler.java  |  9 ++++-
 .../apache/kylin/tool/AbstractInfoExtractor.java   |  2 +-
 .../org/apache/kylin/tool/DiagnosisInfoCLI.java    |  2 +-
 .../org/apache/kylin/tool/JobDiagnosisInfoCLI.java |  2 +-
 .../org/apache/kylin/tool/KylinLogExtractor.java   |  2 +-
 8 files changed, 121 insertions(+), 7 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 91ac9ff..73b8f01 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -731,6 +731,36 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.cube.cubeplanner.algorithm-threshold-genetic", "23"));
     }
 
+    /**
+     * get assigned server array, which a empty string array in default
+     * @return
+     */
+    public String[] getAssignedServers() {
+        return getOptionalStringArray("kylin.cube.schedule.assigned-servers", new String[] {});
+    }
+
+    /**
+     * Determine if the target node is in the assigned node
+     * @param targetServers target task servers
+     * @return
+     */
+    public boolean isOnAssignedServer(String... targetServers) {
+
+        String[] servers = this.getAssignedServers();
+        if (null == servers || servers.length == 0) {
+            return true;
+        }
+
+        for (String s : servers) {
+            for (String ts : targetServers) {
+                if (s.equalsIgnoreCase(ts)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
     // ============================================================================
     // JOB
     // ============================================================================
@@ -1526,7 +1556,7 @@ public abstract class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.engine.spark-fact-distinct", "false"));
     }
 
-    public boolean isSparkCardinalityEnabled(){
+    public boolean isSparkCardinalityEnabled() {
         return Boolean.parseBoolean(getOptional("kylin.engine.spark-cardinality", "false"));
     }
 
diff --git a/tool/src/main/java/org/apache/kylin/tool/util/ToolUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/ToolUtil.java
similarity index 71%
rename from tool/src/main/java/org/apache/kylin/tool/util/ToolUtil.java
rename to core-common/src/main/java/org/apache/kylin/common/util/ToolUtil.java
index 842beb2..35075a6 100644
--- a/tool/src/main/java/org/apache/kylin/tool/util/ToolUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ToolUtil.java
@@ -17,7 +17,7 @@
  *
  */
 
-package org.apache.kylin.tool.util;
+package org.apache.kylin.common.util;
 
 import com.google.common.collect.Maps;
 import org.apache.commons.lang.StringUtils;
@@ -26,8 +26,13 @@ import org.apache.kylin.common.persistence.ResourceStore;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.Inet6Address;
 import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
 import java.net.UnknownHostException;
+import java.util.Enumeration;
 import java.util.Map;
 
 public class ToolUtil {
@@ -90,4 +95,35 @@ public class ToolUtil {
         }
         return hostname;
     }
+
+    public static InetAddress getFirstNonLoopbackAddress(boolean preferIpv4, boolean preferIPv6)
+            throws SocketException {
+        Enumeration en = NetworkInterface.getNetworkInterfaces();
+        while (en.hasMoreElements()) {
+            NetworkInterface element = (NetworkInterface) en.nextElement();
+            for (Enumeration en2 = element.getInetAddresses(); en2.hasMoreElements();) {
+                InetAddress addr = (InetAddress) en2.nextElement();
+                if (!addr.isLoopbackAddress()) {
+                    if (addr instanceof Inet4Address) {
+                        if (preferIPv6) {
+                            continue;
+                        }
+                        return addr;
+                    }
+                    if (addr instanceof Inet6Address) {
+                        if (preferIpv4) {
+                            continue;
+                        }
+                        return addr;
+                    }
+                }
+            }
+        }
+        return null;
+    }
+
+    public static InetAddress getFirstIPV4NonLoopBackAddress() throws SocketException {
+        return getFirstNonLoopbackAddress(true, false);
+    }
+
 }
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/ToolUtilTest.java b/core-common/src/test/java/org/apache/kylin/common/util/ToolUtilTest.java
new file mode 100644
index 0000000..8161d43
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/util/ToolUtilTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.kylin.common.util;
+
+import java.net.Inet4Address;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ToolUtilTest {
+
+    @Test
+    public void testGetFirstIPV4NonLoopBackAddress() throws Exception {
+
+        InetAddress localIp = ToolUtil.getFirstIPV4NonLoopBackAddress();
+        boolean isLoopBackIp = localIp.isLoopbackAddress();
+        Assert.assertNotNull("localIp is null", localIp);
+        Assert.assertEquals(false, isLoopBackIp);
+        Assert.assertEquals(true, localIp instanceof Inet4Address);
+        Assert.assertEquals(false, localIp instanceof Inet6Address);
+    }
+}
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index 8ea1533..4df9221 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -34,6 +34,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.lock.DistributedLock;
 import org.apache.kylin.common.util.SetThreadName;
 import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.common.util.ToolUtil;
 import org.apache.kylin.job.Scheduler;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.exception.ExecuteException;
@@ -102,7 +103,13 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable> {
         public void run() {
             try (SetThreadName ignored = new SetThreadName("Scheduler %s Job %s",
                     System.identityHashCode(DistributedScheduler.this), executable.getId())) {
-                if (jobLock.lock(getLockPath(executable.getId()))) {
+
+                KylinConfig config = executable.getCubeSpecificConfig();
+                boolean isAssigned = config.isOnAssignedServer(ToolUtil.getHostName(),
+                        ToolUtil.getFirstIPV4NonLoopBackAddress().getHostAddress());
+                logger.debug("cube = " + executable.getCubeName() + "; jobId=" + executable.getId()
+                        + (isAssigned ? " is " : " is not ") + "assigned on this server : " + ToolUtil.getHostName());
+                if (isAssigned && jobLock.lock(getLockPath(executable.getId()))) {
                     logger.info(executable.toString() + " scheduled in server: " + serverName);
 
                     context.addRunningJob(executable);
diff --git a/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java b/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java
index 994e4d6..f55e11d 100644
--- a/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java
+++ b/tool/src/main/java/org/apache/kylin/tool/AbstractInfoExtractor.java
@@ -36,7 +36,7 @@ import org.apache.kylin.common.KylinVersion;
 import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.common.util.ZipFileUtils;
-import org.apache.kylin.tool.util.ToolUtil;
+import org.apache.kylin.common.util.ToolUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java b/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java
index 9063e9e..5d85308 100644
--- a/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/DiagnosisInfoCLI.java
@@ -36,7 +36,7 @@ import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
-import org.apache.kylin.tool.util.ToolUtil;
+import org.apache.kylin.common.util.ToolUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java b/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java
index 8fec48e..e71c5f6 100644
--- a/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java
@@ -36,7 +36,7 @@ import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.dao.ExecutableDao;
 import org.apache.kylin.job.dao.ExecutablePO;
-import org.apache.kylin.tool.util.ToolUtil;
+import org.apache.kylin.common.util.ToolUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/tool/src/main/java/org/apache/kylin/tool/KylinLogExtractor.java b/tool/src/main/java/org/apache/kylin/tool/KylinLogExtractor.java
index a84345b..cb9fcd0 100644
--- a/tool/src/main/java/org/apache/kylin/tool/KylinLogExtractor.java
+++ b/tool/src/main/java/org/apache/kylin/tool/KylinLogExtractor.java
@@ -32,7 +32,7 @@ import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.metadata.model.DataModelManager;
 import org.apache.kylin.metadata.project.ProjectManager;
-import org.apache.kylin.tool.util.ToolUtil;
+import org.apache.kylin.common.util.ToolUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;