You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/02/22 06:44:54 UTC
[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4906 Support
query/job server dynamic register and discovery in kylin4
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
new 8a21389 KYLIN-4906 Support query/job server dynamic register and discovery in kylin4
8a21389 is described below
commit 8a21389f685dfbbcd9c9254e578d7e8dcf70f078
Author: zhengshengjun <sh...@sina.com>
AuthorDate: Thu Feb 18 18:02:21 2021 +0800
KYLIN-4906 Support query/job server dynamic register and discovery in kylin4
(cherry picked from commit fa37255b15bdf69e347ec747baf0731ce6634b39)
(cherry picked from commit fa37255b15bdf69e347ec747baf0731ce6634b39)
(cherry picked from commit fa37255b15bdf69e347ec747baf0731ce6634b39)
---
core-common/pom.xml | 5 +
.../java/org/apache/kylin/common/KConstants.java | 23 +++
.../java/org/apache/kylin/common/KylinConfig.java | 14 +-
.../org/apache/kylin/common/KylinConfigBase.java | 4 +
.../java/org/apache/kylin/common/ServerMode.java | 71 --------
.../org/apache/kylin/common/util/ServerMode.java | 5 +-
.../common/zookeeper/KylinServerDiscovery.java | 197 +++++++++++++++++++++
.../kylin/common/zookeeper}/ExampleServer.java | 26 +--
.../common/zookeeper/KylinServerDiscoveryTest.java | 46 ++---
.../kylin/job/impl/curator/CuratorScheduler.java | 139 +--------------
.../job/impl/threadpool/DefaultScheduler.java | 11 +-
.../job/impl/threadpool/DistributedScheduler.java | 11 +-
.../kylin/rest/init/InitialSparderContext.java | 4 +-
.../kylin/rest/service/AclTableMigrationTool.java | 3 +-
.../org/apache/kylin/rest/service/JobService.java | 6 +
.../apache/kylin/rest/service/QueryService.java | 8 +-
16 files changed, 298 insertions(+), 275 deletions(-)
diff --git a/core-common/pom.xml b/core-common/pom.xml
index 093d5ce..4680d4d 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -50,6 +50,11 @@
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-x-discovery</artifactId>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
</dependency>
diff --git a/core-common/src/main/java/org/apache/kylin/common/KConstants.java b/core-common/src/main/java/org/apache/kylin/common/KConstants.java
new file mode 100644
index 0000000..5e1723c
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/KConstants.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common;
+
+public class KConstants {
+ public static final int DEFAULT_SERVICE_PORT = 7070;
+}
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 7805a0f..a7d5b8b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -18,7 +18,6 @@
package org.apache.kylin.common;
-import org.apache.kylin.shaded.com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.restclient.RestClient;
@@ -48,6 +47,9 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.kylin.shaded.com.google.common.base.Strings;
+import org.apache.kylin.shaded.com.google.common.base.Preconditions;
+
/**
*/
public class KylinConfig extends KylinConfigBase {
@@ -531,7 +533,15 @@ public class KylinConfig extends KylinConfigBase {
String value = entry.getValue().toString();
orderedProperties.setProperty(key, value);
}
-
+ // Reset some properties which might be overriden by system properties
+ String[] systemProps = { "kylin.server.cluster-servers", "kylin.server.cluster-servers-with-mode" };
+ for (String sysProp : systemProps) {
+ String sysPropValue = System.getProperty(sysProp);
+ if (!Strings.isNullOrEmpty(sysPropValue)) {
+ orderedProperties.setProperty(sysProp, sysPropValue);
+ }
+ }
+
final StringBuilder sb = new StringBuilder();
for (Map.Entry<String, String> entry : orderedProperties.entrySet()) {
sb.append(entry.getKey() + "=" + entry.getValue()).append('\n');
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 0fd24e2..2ed1cb5 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2163,6 +2163,10 @@ public abstract class KylinConfigBase implements Serializable {
return getOptional("kylin.server.host-address", "localhost:7070");
}
+ public boolean getServerSelfDiscoveryEnabled() {
+ return Boolean.parseBoolean(getOptional("kylin.server.self-discovery-enabled", FALSE));
+ }
+
public String getClusterName() {
String key = "kylin.server.cluster-name";
String clusterName = this.getOptional(key, getMetadataUrlPrefix());
diff --git a/core-common/src/main/java/org/apache/kylin/common/ServerMode.java b/core-common/src/main/java/org/apache/kylin/common/ServerMode.java
deleted file mode 100644
index fb3624d..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/ServerMode.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.common;
-
-public enum ServerMode {
-
- ALL("all"), JOB("job"), QUERY("query");
-
- private final String name;
-
- ServerMode(String name) {
- this.name = name;
- }
-
- public String getName() {
- return name;
- }
-
- private static void validate(KylinConfig config) {
- assert config != null;
- }
-
- private static boolean match(ServerMode serverMode, KylinConfig config) {
- validate(config);
- return serverMode.getName().equals(config.getServerMode());
- }
-
- public static boolean isJob(KylinConfig config) {
- return isJobOnly(config) || isAll(config);
- }
-
- public static boolean isJob(String serverMode) {
- return ALL.name.equals(serverMode) || JOB.name.equals(serverMode);
- }
-
- public static boolean isJobOnly(KylinConfig config) {
- return match(JOB, config);
- }
-
- public static boolean isQueryOnly(KylinConfig config) {
- return match(QUERY, config);
- }
-
- public static boolean isQuery(KylinConfig config) {
- return isQueryOnly(config) || isAll(config);
- }
-
- public static boolean isAll(KylinConfig config) {
- return match(ALL, config);
- }
-
- public static boolean isQuery(String serverMode) {
- return ALL.name.equals(serverMode) || QUERY.name.equals(serverMode);
- }
-}
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ServerMode.java b/core-common/src/main/java/org/apache/kylin/common/util/ServerMode.java
index 14b7f18..314abe1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/ServerMode.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/ServerMode.java
@@ -62,7 +62,10 @@ public class ServerMode {
public static ServerMode SERVER_MODE = getServerMode();
private static ServerMode getServerMode() {
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ return getServerMode(KylinConfig.getInstanceFromEnv());
+ }
+
+ public static ServerMode getServerMode(KylinConfig kylinConfig) {
String serverModeStr = kylinConfig.getServerMode();
List<String> serverModes = Lists.newArrayList();
String[] serverModeArray = serverModeStr.split("\\s*,\\s*");
diff --git a/core-common/src/main/java/org/apache/kylin/common/zookeeper/KylinServerDiscovery.java b/core-common/src/main/java/org/apache/kylin/common/zookeeper/KylinServerDiscovery.java
new file mode 100644
index 0000000..07f59ad
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/zookeeper/KylinServerDiscovery.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.zookeeper;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.x.discovery.ServiceCache;
+import org.apache.curator.x.discovery.ServiceDiscovery;
+import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
+import org.apache.curator.x.discovery.ServiceInstance;
+import org.apache.curator.x.discovery.details.InstanceSerializer;
+import org.apache.curator.x.discovery.details.ServiceCacheListener;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.common.util.ZKUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class KylinServerDiscovery implements Closeable {
+
+ private static final Logger logger = LoggerFactory.getLogger(KylinServerDiscovery.class);
+
+ public static final String SERVICE_PATH = "/service";
+ public static final String SERVICE_NAME = "cluster_servers";
+ public static final String SERVICE_PAYLOAD_DESCRIPTION = "description";
+
+ private static class SingletonHolder {
+ private static final KylinServerDiscovery INSTANCE = new KylinServerDiscovery();
+ }
+
+ public static KylinServerDiscovery getInstance() {
+ return SingletonHolder.INSTANCE;
+ }
+
+ private final KylinConfig kylinConfig;
+ private final CuratorFramework curator;
+ private final ServiceDiscovery<LinkedHashMap> serviceDiscovery;
+ private final ServiceCache<LinkedHashMap> serviceCache;
+
+ private KylinServerDiscovery() {
+ this(KylinConfig.getInstanceFromEnv());
+ }
+
+ @VisibleForTesting
+ protected KylinServerDiscovery(KylinConfig kylinConfig) {
+ this.kylinConfig = kylinConfig;
+ this.curator = ZKUtil.getZookeeperClient(kylinConfig);
+ try {
+ final JsonInstanceSerializer<LinkedHashMap> serializer = new JsonInstanceSerializer<>(LinkedHashMap.class);
+ serviceDiscovery = ServiceDiscoveryBuilder.builder(LinkedHashMap.class).client(curator)
+ .basePath(SERVICE_PATH).serializer(serializer).build();
+ serviceDiscovery.start();
+
+ serviceCache = serviceDiscovery.serviceCacheBuilder().name(SERVICE_NAME)
+ .threadFactory(
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("KylinServerTracker-%d").build())
+ .build();
+
+ final AtomicBoolean isFinishInit = new AtomicBoolean(false);
+ serviceCache.addListener(new ServiceCacheListener() {
+ @Override
+ public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
+ }
+
+ @Override
+ public void cacheChanged() {
+ logger.info("Service discovery get cacheChanged notification");
+ final List<ServiceInstance<LinkedHashMap>> instances = serviceCache.getInstances();
+ Map<String, String> instanceNodes = Maps.newHashMapWithExpectedSize(instances.size());
+ for (ServiceInstance<LinkedHashMap> entry : instances) {
+ instanceNodes.put(entry.getAddress() + ":" + entry.getPort(),
+ (String) entry.getPayload().get(SERVICE_PAYLOAD_DESCRIPTION));
+ }
+
+ logger.info("kylin.server.cluster-servers update to " + instanceNodes);
+ // update cluster servers
+ System.setProperty("kylin.server.cluster-servers", StringUtil.join(instanceNodes.keySet(), ","));
+
+ // get servers and its mode(query, job, all)
+ final String restServersInClusterWithMode = StringUtil.join(instanceNodes.entrySet().stream()
+ .map(input -> input.getKey() + ":" + input.getValue()).collect(Collectors.toList()), ",");
+ logger.info("kylin.server.cluster-servers-with-mode update to " + restServersInClusterWithMode);
+ System.setProperty("kylin.server.cluster-servers-with-mode", restServersInClusterWithMode);
+ isFinishInit.set(true);
+ }
+ });
+ serviceCache.start();
+
+ registerSelf();
+ while (!isFinishInit.get()) {
+ logger.info("Haven't registered, waiting ...");
+ Thread.sleep(100L);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Fail to initialize due to ", e);
+ }
+ }
+
+ private void registerSelf() throws Exception {
+ String hostAddr = kylinConfig.getServerRestAddress();
+ String[] hostAddrInfo = hostAddr.split(":");
+ if (hostAddrInfo.length < 2) {
+ logger.error("kylin.server.host-address {} is not qualified ", hostAddr);
+ throw new RuntimeException("kylin.server.host-address " + hostAddr + " is not qualified");
+ }
+ String host = hostAddrInfo[0];
+ int port = Integer.parseInt(hostAddrInfo[1]);
+
+ String serverMode = kylinConfig.getServerMode();
+ registerServer(host, port, serverMode);
+ }
+
+ private void registerServer(String host, int port, String mode) throws Exception {
+ final LinkedHashMap<String, String> instanceDetail = new LinkedHashMap<>();
+ instanceDetail.put(SERVICE_PAYLOAD_DESCRIPTION, mode);
+
+ ServiceInstance<LinkedHashMap> thisInstance = ServiceInstance.<LinkedHashMap> builder().name(SERVICE_NAME)
+ .payload(instanceDetail).port(port).address(host).build();
+
+ for (ServiceInstance<LinkedHashMap> instance : serviceCache.getInstances()) {
+ // Check for registered instances to avoid being double registered
+ if (instance.getAddress().equals(thisInstance.getAddress())
+ && instance.getPort().equals(thisInstance.getPort())) {
+ serviceDiscovery.unregisterService(instance);
+ }
+ }
+ serviceDiscovery.registerService(thisInstance);
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOUtils.closeQuietly(serviceCache);
+ IOUtils.closeQuietly(serviceDiscovery);
+ }
+
+ static class JsonInstanceSerializer<T> implements InstanceSerializer<T> {
+ private final ObjectMapper mapper;
+ private final Class<T> payloadClass;
+ private final JavaType type;
+
+ JsonInstanceSerializer(Class<T> payloadClass) {
+ this.payloadClass = payloadClass;
+ this.mapper = new ObjectMapper();
+
+ // to bypass https://issues.apache.org/jira/browse/CURATOR-394
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+ this.type = this.mapper.getTypeFactory().constructType(ServiceInstance.class);
+ }
+
+ public ServiceInstance<T> deserialize(byte[] bytes) throws Exception {
+ ServiceInstance rawServiceInstance = this.mapper.readValue(bytes, this.type);
+ this.payloadClass.cast(rawServiceInstance.getPayload());
+ return rawServiceInstance;
+ }
+
+ public byte[] serialize(ServiceInstance<T> instance) throws Exception {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ mapper.convertValue(instance.getPayload(), payloadClass);
+ this.mapper.writeValue(out, instance);
+ return out.toByteArray();
+ }
+ }
+}
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/curator/ExampleServer.java b/core-common/src/test/java/org/apache/kylin/common/zookeeper/ExampleServer.java
similarity index 61%
rename from core-job/src/test/java/org/apache/kylin/job/impl/curator/ExampleServer.java
rename to core-common/src/test/java/org/apache/kylin/common/zookeeper/ExampleServer.java
index 66e3832..9e9fe95 100644
--- a/core-job/src/test/java/org/apache/kylin/job/impl/curator/ExampleServer.java
+++ b/core-common/src/test/java/org/apache/kylin/common/zookeeper/ExampleServer.java
@@ -16,38 +16,28 @@
* limitations under the License.
*/
-package org.apache.kylin.job.impl.curator;
+package org.apache.kylin.common.zookeeper;
import java.io.Closeable;
import java.io.IOException;
-import org.apache.curator.framework.CuratorFramework;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ZKUtil;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.exception.SchedulerException;
-import org.apache.kylin.job.lock.MockJobLock;
/**
*/
public class ExampleServer implements Closeable {
private String address;
- private CuratorScheduler scheduler;
+ private KylinServerDiscovery discovery;
- public ExampleServer(String address) throws Exception {
+ public ExampleServer(String address) {
this.address = address;
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
KylinConfig kylinConfig1 = KylinConfig.createKylinConfig(kylinConfig);
kylinConfig1.setProperty("kylin.server.host-address", address);
- CuratorFramework client = ZKUtil.newZookeeperClient(kylinConfig1);
- scheduler = new CuratorScheduler(client);
- scheduler.init(new JobEngineConfig(kylinConfig1), new MockJobLock());
- if (!scheduler.hasStarted()) {
- throw new RuntimeException("scheduler has not been started");
- }
+ discovery = new KylinServerDiscovery(kylinConfig1);
}
public String getAddress() {
@@ -56,13 +46,7 @@ public class ExampleServer implements Closeable {
@Override
public void close() throws IOException {
-
- if (scheduler!= null)
- try {
- scheduler.shutdown();
- } catch (SchedulerException e) {
- //
- }
+ discovery.close();
}
}
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/curator/CuratorSchedulerTest.java b/core-common/src/test/java/org/apache/kylin/common/zookeeper/KylinServerDiscoveryTest.java
similarity index 71%
rename from core-job/src/test/java/org/apache/kylin/job/impl/curator/CuratorSchedulerTest.java
rename to core-common/src/test/java/org/apache/kylin/common/zookeeper/KylinServerDiscoveryTest.java
index 4cf1410..77dc673 100644
--- a/core-job/src/test/java/org/apache/kylin/job/impl/curator/CuratorSchedulerTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/zookeeper/KylinServerDiscoveryTest.java
@@ -15,13 +15,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kylin.job.impl.curator;
+package org.apache.kylin.common.zookeeper;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
-
-import javax.annotation.Nullable;
+import java.util.stream.Collectors;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.retry.ExponentialBackoffRetry;
@@ -32,7 +31,6 @@ import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.apache.kylin.common.util.ZKUtil;
-import org.apache.kylin.job.execution.ExecutableManager;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -40,19 +38,16 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.kylin.shaded.com.google.common.base.Function;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
/**
*/
-public class CuratorSchedulerTest extends LocalFileMetadataTestCase {
+public class KylinServerDiscoveryTest extends LocalFileMetadataTestCase {
- private static final Logger logger = LoggerFactory.getLogger(CuratorSchedulerTest.class);
+ private static final Logger logger = LoggerFactory.getLogger(KylinServerDiscoveryTest.class);
private TestingServer zkTestServer;
- protected ExecutableManager jobService;
-
@Before
public void setup() throws Exception {
zkTestServer = new TestingServer();
@@ -80,10 +75,9 @@ public class CuratorSchedulerTest extends LocalFileMetadataTestCase {
ServiceDiscovery<LinkedHashMap> serviceDiscovery = null;
CuratorFramework curatorClient = null;
try {
-
- final CuratorScheduler.JsonInstanceSerializer<LinkedHashMap> serializer = new CuratorScheduler.JsonInstanceSerializer<>(
- LinkedHashMap.class);
- String servicePath = CuratorScheduler.KYLIN_SERVICE_PATH;
+ String servicePath = KylinServerDiscovery.SERVICE_PATH;
+ final KylinServerDiscovery.JsonInstanceSerializer<LinkedHashMap> serializer =
+ new KylinServerDiscovery.JsonInstanceSerializer<>(LinkedHashMap.class);
curatorClient = ZKUtil.newZookeeperClient(zkString, new ExponentialBackoffRetry(3000, 3));
serviceDiscovery = ServiceDiscoveryBuilder.builder(LinkedHashMap.class).client(curatorClient)
.basePath(servicePath).serializer(serializer).build();
@@ -94,36 +88,32 @@ public class CuratorSchedulerTest extends LocalFileMetadataTestCase {
Collection<String> serviceNames = serviceDiscovery.queryForNames();
Assert.assertTrue(serviceNames.size() == 1);
- Assert.assertTrue(CuratorScheduler.SERVICE_NAME.equals(serviceNames.iterator().next()));
+ Assert.assertTrue(KylinServerDiscovery.SERVICE_NAME.equals(serviceNames.iterator().next()));
Collection<ServiceInstance<LinkedHashMap>> instances = serviceDiscovery
- .queryForInstances(CuratorScheduler.SERVICE_NAME);
+ .queryForInstances(KylinServerDiscovery.SERVICE_NAME);
Assert.assertTrue(instances.size() == 2);
List<ServiceInstance<LinkedHashMap>> instancesList = Lists.newArrayList(instances);
- final List<String> instanceNodes = Lists.transform(instancesList,
- new Function<ServiceInstance<LinkedHashMap>, String>() {
-
- @Nullable
- @Override
- public String apply(@Nullable ServiceInstance<LinkedHashMap> stringServiceInstance) {
- return (String) stringServiceInstance.getPayload()
- .get(CuratorScheduler.SERVICE_PAYLOAD_DESCRIPTION);
- }
- });
+ final List<String> instanceNodes = instancesList.stream()
+ .map(input -> input.getAddress() + ":" + input.getPort() + ":"
+ + input.getPayload().get(KylinServerDiscovery.SERVICE_PAYLOAD_DESCRIPTION))
+ .collect(Collectors.toList());
Assert.assertTrue(instanceNodes.contains(server1.getAddress() + ":query"));
Assert.assertTrue(instanceNodes.contains(server2.getAddress() + ":query"));
// stop one server
server1.close();
- instances = serviceDiscovery.queryForInstances(CuratorScheduler.SERVICE_NAME);
+ instances = serviceDiscovery.queryForInstances(KylinServerDiscovery.SERVICE_NAME);
+ ServiceInstance<LinkedHashMap> existingInstance = instances.iterator().next();
Assert.assertTrue(instances.size() == 1);
Assert.assertEquals(server2.getAddress() + ":query",
- instances.iterator().next().getPayload().get(CuratorScheduler.SERVICE_PAYLOAD_DESCRIPTION));
+ existingInstance.getAddress() + ":" + existingInstance.getPort() + ":"
+ + existingInstance.getPayload().get(KylinServerDiscovery.SERVICE_PAYLOAD_DESCRIPTION));
// all stop
server2.close();
- instances = serviceDiscovery.queryForInstances(CuratorScheduler.SERVICE_NAME);
+ instances = serviceDiscovery.queryForInstances(KylinServerDiscovery.SERVICE_NAME);
Assert.assertTrue(instances.size() == 0);
} finally {
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorScheduler.java
index faa7d71..931dc8a 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/curator/CuratorScheduler.java
@@ -18,30 +18,16 @@
package org.apache.kylin.job.impl.curator;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetAddress;
-import java.util.LinkedHashMap;
-import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-
-import javax.annotation.Nullable;
import org.apache.commons.io.IOUtils;
import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.state.ConnectionState;
-import org.apache.curator.x.discovery.ServiceCache;
-import org.apache.curator.x.discovery.ServiceDiscovery;
-import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
-import org.apache.curator.x.discovery.ServiceInstance;
-import org.apache.curator.x.discovery.details.InstanceSerializer;
-import org.apache.curator.x.discovery.details.ServiceCacheListener;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.ServerMode;
-import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.common.util.ServerMode;
import org.apache.kylin.common.util.ZKUtil;
import org.apache.kylin.job.Scheduler;
import org.apache.kylin.job.engine.JobEngineConfig;
@@ -51,12 +37,7 @@ import org.apache.kylin.job.lock.JobLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kylin.shaded.com.google.common.annotations.VisibleForTesting;
-import org.apache.kylin.shaded.com.google.common.base.Function;
-import org.apache.kylin.shaded.com.google.common.collect.Lists;
+import com.google.common.annotations.VisibleForTesting;
public class CuratorScheduler implements Scheduler<AbstractExecutable> {
@@ -64,15 +45,10 @@ public class CuratorScheduler implements Scheduler<AbstractExecutable> {
private boolean started = false;
private CuratorFramework curatorClient = null;
private static CuratorLeaderSelector jobClient = null;
- private ServiceDiscovery<LinkedHashMap> serviceDiscovery = null;
- private ServiceCache<LinkedHashMap> serviceCache = null;
private KylinConfig kylinConfig;
private AtomicInteger count = new AtomicInteger();
static final String JOB_ENGINE_LEADER_PATH = "/job_engine/leader";
- static final String KYLIN_SERVICE_PATH = "/service";
- static final String SERVICE_NAME = "kylin";
- static final String SERVICE_PAYLOAD_DESCRIPTION = "description";
// the default constructor should exist for reflection initialization
public CuratorScheduler() {
@@ -100,17 +76,11 @@ public class CuratorScheduler implements Scheduler<AbstractExecutable> {
curatorClient = ZKUtil.getZookeeperClient(kylinConfig);
}
- final String serverMode = jobEngineConfig.getConfig().getServerMode();
- final String restAddress = kylinConfig.getServerRestAddress();
- try {
- registerInstance(restAddress, serverMode);
- } catch (Exception e) {
- throw new SchedulerException(e);
- }
+ String restAddress = kylinConfig.getServerRestAddress();
String jobEnginePath = JOB_ENGINE_LEADER_PATH;
- if (ServerMode.isJob(jobEngineConfig.getConfig())) {
+ if (ServerMode.getServerMode(kylinConfig).canServeJobBuild()) {
jobClient = new CuratorLeaderSelector(curatorClient, jobEnginePath, restAddress, jobEngineConfig);
try {
logger.info("start Job Engine, lock path is: " + jobEnginePath);
@@ -120,79 +90,13 @@ public class CuratorScheduler implements Scheduler<AbstractExecutable> {
throw new SchedulerException(e);
}
} else {
- logger.info("server mode: " + jobEngineConfig.getConfig().getServerMode() + ", no need to run job scheduler");
+ logger.info("server mode: " + jobEngineConfig.getConfig().getServerMode()
+ + ", no need to run job scheduler");
}
started = true;
}
}
- private void registerInstance(String restAddress, String mode) throws Exception {
- final String host = restAddress.substring(0, restAddress.indexOf(":"));
- final String port = restAddress.substring(restAddress.indexOf(":") + 1);
-
- final JsonInstanceSerializer<LinkedHashMap> serializer = new JsonInstanceSerializer<>(LinkedHashMap.class);
- final String servicePath = KYLIN_SERVICE_PATH;
- serviceDiscovery = ServiceDiscoveryBuilder.builder(LinkedHashMap.class).client(curatorClient)
- .basePath(servicePath).serializer(serializer).build();
- serviceDiscovery.start();
-
- serviceCache = serviceDiscovery.serviceCacheBuilder().name(SERVICE_NAME)
- .threadFactory(Executors.defaultThreadFactory()).build();
-
- serviceCache.addListener(new ServiceCacheListener() {
- @Override
- public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
- }
-
- @Override
- public void cacheChanged() {
- logger.info("Service discovery get cacheChanged notification");
- final List<ServiceInstance<LinkedHashMap>> instances = serviceCache.getInstances();
- final List<String> instanceNodes = Lists.transform(instances,
- new Function<ServiceInstance<LinkedHashMap>, String>() {
-
- @Nullable
- @Override
- public String apply(@Nullable ServiceInstance<LinkedHashMap> stringServiceInstance) {
- return (String) stringServiceInstance.getPayload().get(SERVICE_PAYLOAD_DESCRIPTION);
- }
- });
-
- final String restServersInCluster = //
- StringUtil.join(instanceNodes.stream().map(input -> { //
- String[] split = input.split(":"); //
- return split[0] + ":" + split[1]; //
- }).collect(Collectors.toList()), ","); //
-
-
- logger.info("kylin.server.cluster-servers update to " + restServersInCluster);
- // update cluster servers
- System.setProperty("kylin.server.cluster-servers", restServersInCluster);
-
- // get servers and its mode(query, job, all)
- final String restServersInClusterWithMode = StringUtil.join(instanceNodes, ",");
- logger.info("kylin.server.cluster-servers-with-mode update to " + restServersInClusterWithMode);
- System.setProperty("kylin.server.cluster-servers-with-mode", restServersInClusterWithMode);
- }
- });
- serviceCache.start();
-
- final LinkedHashMap<String, String> instanceDetail = new LinkedHashMap<>();
-
- instanceDetail.put(SERVICE_PAYLOAD_DESCRIPTION, restAddress + ":" + mode);
- ServiceInstance<LinkedHashMap> thisInstance = ServiceInstance.<LinkedHashMap> builder().name(SERVICE_NAME)
- .payload(instanceDetail).port(Integer.valueOf(port)).address(host).build();
-
- for (ServiceInstance<LinkedHashMap> instance : serviceCache.getInstances()) {
- // Check for registered instances to avoid being double registered
- if (instance.getAddress().equals(thisInstance.getAddress())
- && instance.getPort().equals(thisInstance.getPort())) {
- serviceDiscovery.unregisterService(instance);
- }
- }
- serviceDiscovery.registerService(thisInstance);
- }
-
private void monitorJobEngine() {
logger.info("Start collect monitor ZK Participants");
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(new Runnable() {
@@ -220,8 +124,6 @@ public class CuratorScheduler implements Scheduler<AbstractExecutable> {
@Override
public void shutdown() throws SchedulerException {
- IOUtils.closeQuietly(serviceCache);
- IOUtils.closeQuietly(serviceDiscovery);
IOUtils.closeQuietly(curatorClient);
IOUtils.closeQuietly(jobClient);
started = false;
@@ -248,33 +150,4 @@ public class CuratorScheduler implements Scheduler<AbstractExecutable> {
return jobClient;
}
- static class JsonInstanceSerializer<T> implements InstanceSerializer<T> {
- private final ObjectMapper mapper;
- private final Class<T> payloadClass;
- private final JavaType type;
-
- JsonInstanceSerializer(Class<T> payloadClass) {
- this.payloadClass = payloadClass;
- this.mapper = new ObjectMapper();
-
- // to bypass https://issues.apache.org/jira/browse/CURATOR-394
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-
- this.type = this.mapper.getTypeFactory().constructType(ServiceInstance.class);
- }
-
- public ServiceInstance<T> deserialize(byte[] bytes) throws Exception {
- ServiceInstance rawServiceInstance = this.mapper.readValue(bytes, this.type);
- this.payloadClass.cast(rawServiceInstance.getPayload());
- return rawServiceInstance;
- }
-
- public byte[] serialize(ServiceInstance<T> instance) throws Exception {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- mapper.convertValue(instance.getPayload(), payloadClass);
- this.mapper.writeValue(out, instance);
- return out.toByteArray();
- }
- }
-
}
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index f45c2e4..9c4573e 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -18,7 +18,6 @@
package org.apache.kylin.job.impl.threadpool;
-import java.util.Locale;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -26,6 +25,7 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.kylin.common.util.ServerMode;
import org.apache.kylin.common.util.SetThreadName;
import org.apache.kylin.job.Scheduler;
import org.apache.kylin.job.engine.JobEngineConfig;
@@ -35,11 +35,10 @@ import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.Executable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.lock.JobLock;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.kylin.shaded.com.google.common.collect.Maps;
-
/**
*/
public class DefaultScheduler implements Scheduler<AbstractExecutable> {
@@ -134,9 +133,9 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable> {
public synchronized void init(JobEngineConfig jobEngineConfig, JobLock lock) throws SchedulerException {
jobLock = lock;
- String serverMode = jobEngineConfig.getConfig().getServerMode();
- if (!("job".equals(serverMode.toLowerCase(Locale.ROOT)) || "all".equals(serverMode.toLowerCase(Locale.ROOT)))) {
- logger.info("server mode: " + serverMode + ", no need to run job scheduler");
+ if (!ServerMode.SERVER_MODE.canServeJobBuild()) {
+ logger.info(
+ "server mode: " + jobEngineConfig.getConfig().getServerMode() + ", no need to run job scheduler");
return;
}
logger.info("Initializing Job Engine ....");
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index 77717bc..51e7dc0 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -20,7 +20,6 @@ package org.apache.kylin.job.impl.threadpool;
import java.io.Closeable;
import java.io.IOException;
-import java.util.Locale;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
@@ -33,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.lock.DistributedLock;
+import org.apache.kylin.common.util.ServerMode;
import org.apache.kylin.common.util.SetThreadName;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.common.util.ToolUtil;
@@ -48,11 +48,10 @@ import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.Output;
import org.apache.kylin.job.lock.JobLock;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.kylin.shaded.com.google.common.collect.Maps;
-
/**
* schedule the cubing jobs when several job server running with the same metadata.
*
@@ -107,9 +106,9 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable> {
@Override
public synchronized void init(JobEngineConfig jobEngineConfig, JobLock jobLock) throws SchedulerException {
- String serverMode = jobEngineConfig.getConfig().getServerMode();
- if (!("job".equals(serverMode.toLowerCase(Locale.ROOT)) || "all".equals(serverMode.toLowerCase(Locale.ROOT)))) {
- logger.info("server mode: " + serverMode + ", no need to run job scheduler");
+ if (!ServerMode.SERVER_MODE.canServeJobBuild()) {
+ logger.info(
+ "server mode: " + jobEngineConfig.getConfig().getServerMode() + ", no need to run job scheduler");
return;
}
logger.info("Initializing Job Engine ....");
diff --git a/server-base/src/main/java/org/apache/kylin/rest/init/InitialSparderContext.java b/server-base/src/main/java/org/apache/kylin/rest/init/InitialSparderContext.java
index 49714bd..d57c9ce 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/init/InitialSparderContext.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/init/InitialSparderContext.java
@@ -19,7 +19,7 @@
package org.apache.kylin.rest.init;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.ServerMode;
+import org.apache.kylin.common.util.ServerMode;
import org.apache.spark.sql.SparderContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +36,7 @@ public class InitialSparderContext implements InitializingBean {
private void runInitialSparder() {
KylinConfig config = KylinConfig.getInstanceFromEnv();
- if (ServerMode.isJobOnly(config) || !config.isAutoStartSparder()) {
+ if (!ServerMode.SERVER_MODE.canServeQuery() || !config.isAutoStartSparder()) {
logger.info("Maybe this is job node, or switch is off, do not need to start Spark, {}", config.isAutoStartSparder());
return;
}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/AclTableMigrationTool.java b/server-base/src/main/java/org/apache/kylin/rest/service/AclTableMigrationTool.java
index 42db50d..206c618 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/AclTableMigrationTool.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/AclTableMigrationTool.java
@@ -39,6 +39,7 @@ import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.StringEntity;
import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.ServerMode;
import org.apache.kylin.rest.security.AclConstant;
import org.apache.kylin.rest.security.ManagedUser;
import org.apache.kylin.rest.security.springacl.AclRecord;
@@ -75,7 +76,7 @@ public class AclTableMigrationTool {
logger.info("Do not need to migrate acl table data");
return;
} else {
- if (!kylinConfig.getServerMode().equals("all")) {
+ if (!ServerMode.SERVER_MODE.canServeAll()) {
throw new IllegalStateException(
"Please make sure that you have config kylin.server.mode=all before migrating data");
}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index e543c22..7cb5d3d 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -38,6 +38,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.lock.DistributedLock;
import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.zookeeper.KylinServerDiscovery;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
@@ -134,6 +135,11 @@ public class JobService extends BasicService implements InitializingBean {
final Scheduler<AbstractExecutable> scheduler = (Scheduler<AbstractExecutable>) SchedulerFactory
.scheduler(kylinConfig.getSchedulerType());
+ if (kylinConfig.getServerSelfDiscoveryEnabled()) {
+ KylinServerDiscovery.getInstance();
+ }
+ logger.info("Cluster servers: {}", Lists.newArrayList(kylinConfig.getRestServers()));
+
scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock());
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 7296636..b8f1c56 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -77,6 +77,7 @@ import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.util.DBUtils;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.ServerMode;
import org.apache.kylin.common.util.SetThreadName;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeInstance;
@@ -408,10 +409,9 @@ public class QueryService extends BasicService {
sqlRequest.setUsername(getUserName());
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- String serverMode = kylinConfig.getServerMode();
- if (!(Constant.SERVER_MODE_QUERY.equals(serverMode.toLowerCase(Locale.ROOT))
- || Constant.SERVER_MODE_ALL.equals(serverMode.toLowerCase(Locale.ROOT)))) {
- throw new BadRequestException(String.format(Locale.ROOT, msg.getQUERY_NOT_ALLOWED(), serverMode));
+ if (!ServerMode.SERVER_MODE.canServeQuery()) {
+ throw new BadRequestException(
+ String.format(Locale.ROOT, msg.getQUERY_NOT_ALLOWED(), kylinConfig.getServerMode()));
}
if (StringUtils.isBlank(sqlRequest.getProject())) {
throw new BadRequestException(msg.getEMPTY_PROJECT_NAME());