You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/02/14 05:54:54 UTC
[kylin] 25/33: KYLIN-5442 Optimized for loading Kafka Kerberos keyTab
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit c16c2e712ccc98c74bed7368d44fedcd05a66584
Author: Jiale He <35...@users.noreply.github.com>
AuthorDate: Fri Dec 9 17:18:10 2022 +0800
KYLIN-5442 Optimized for loading Kafka Kerberos keyTab
---
.../common/exception/code/ErrorCodeServer.java | 3 +
.../resources/kylin_error_msg_conf_cn.properties | 3 +
.../resources/kylin_error_msg_conf_en.properties | 3 +
.../kylin_error_suggestion_conf_cn.properties | 5 +
.../kylin_error_suggestion_conf_en.properties | 6 +
.../main/resources/kylin_errorcode_conf.properties | 4 +
.../org/apache/kylin/kafka/util/KafkaUtils.java | 5 +-
.../streaming/constants/StreamingConstants.java | 6 +
.../kylin/streaming/jobs/StreamingJobListener.java | 20 +--
.../kylin/streaming/jobs/StreamingJobUtils.java | 145 ++++++++++++++++++---
.../streaming/jobs/impl/StreamingJobLauncher.java | 132 ++++++++++---------
.../kylin/streaming/CreateStreamingFlatTable.scala | 3 -
.../apache/kylin/kafka/util/KafkaUtilsTest.java | 6 +-
.../streaming/jobs/StreamingJobListenerTest.java | 32 ++---
.../streaming/jobs/StreamingJobUtilsTest.java | 98 ++++++++++++--
.../jobs/impl/StreamingJobLauncherTest.java | 33 ++++-
16 files changed, 368 insertions(+), 136 deletions(-)
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java b/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java
index 7b60e33ab2..b5eb0fd98c 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/exception/code/ErrorCodeServer.java
@@ -145,6 +145,9 @@ public enum ErrorCodeServer implements ErrorCodeProducer {
// 100352XX Streaming
STREAMING_PARSE_MESSAGE_ERROR("KE-010035202"),
+ READ_KAFKA_JAAS_FILE_ERROR("KE-010035215"),
+ KAFKA_JAAS_FILE_KEYTAB_NOT_EXISTS("KE-010035216"),
+ KAFKA_JAAS_FILE_KAFKACLIENT_NOT_EXISTS("KE-010035217"),
// 100422XX CUSTOM PARSER
CUSTOM_PARSER_NOT_JAR("KE-010042201"),
diff --git a/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties b/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties
index cf284ffba4..be8bca221a 100644
--- a/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties
+++ b/src/core-common/src/main/resources/kylin_error_msg_conf_cn.properties
@@ -136,6 +136,9 @@ KE-010043221=参数 “%s” 已存在。请检查后重试。
## Streaming
KE-010035202=使用解析器 “%s” 解析Topic “%s” 的消息时发生异常,请检查后重试。
+KE-010035215=无法正确读取 Kafka 认证文件,请检查后再试。
+KE-010035216=Kafka 认证文件中的 keyTab 文件不存在,请检查后重试。
+KE-010035217=Kafka 认证文件中不存在 “KafkaClient”,请检查后重试。
## XX 100422 Custom Parser
diff --git a/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties b/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties
index f8a170e52e..b44c0ef422 100644
--- a/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties
+++ b/src/core-common/src/main/resources/kylin_error_msg_conf_en.properties
@@ -133,6 +133,9 @@ KE-010043221=The parameter “%s” already exists. Please check and try again.
## Streaming
KE-010035202=An exception occurred while parsing the messages of Topic "%2$s" with parser "%1$s". Please check and try again.
+KE-010035215=Can't read Kafka authentication file correctly. Please check and try again.
+KE-010035216=The keyTab file in the Kafka authentication file does not exist, please check and try again.
+KE-010035217="KafkaClient" does not exist in the Kafka authentication file, please check and try again.
## XX 100422 Custom Parser
KE-010042201=The file format is invalid. Please use jar format file.
diff --git a/src/core-common/src/main/resources/kylin_error_suggestion_conf_cn.properties b/src/core-common/src/main/resources/kylin_error_suggestion_conf_cn.properties
index 10d5e8e0e4..059fa231e7 100644
--- a/src/core-common/src/main/resources/kylin_error_suggestion_conf_cn.properties
+++ b/src/core-common/src/main/resources/kylin_error_suggestion_conf_cn.properties
@@ -128,6 +128,11 @@ KE-010043219=
KE-010043220=
KE-010043221=
+## Streaming
+KE-010035215=
+KE-010035216=
+KE-010035217=
+
# System
## 400052XX password
KE-040005201=
diff --git a/src/core-common/src/main/resources/kylin_error_suggestion_conf_en.properties b/src/core-common/src/main/resources/kylin_error_suggestion_conf_en.properties
index 1b5ffcc1c2..05ce078ac1 100644
--- a/src/core-common/src/main/resources/kylin_error_suggestion_conf_en.properties
+++ b/src/core-common/src/main/resources/kylin_error_suggestion_conf_en.properties
@@ -128,6 +128,12 @@ KE-010043219=
KE-010043220=
KE-010043221=
+
+## Streaming
+KE-010035215=
+KE-010035216=
+KE-010035217=
+
# System
## 400052XX password
KE-040005201=
diff --git a/src/core-common/src/main/resources/kylin_errorcode_conf.properties b/src/core-common/src/main/resources/kylin_errorcode_conf.properties
index a80a9c19a9..074fb6550e 100644
--- a/src/core-common/src/main/resources/kylin_errorcode_conf.properties
+++ b/src/core-common/src/main/resources/kylin_errorcode_conf.properties
@@ -139,6 +139,10 @@ KE-010043221
## Streaming
KE-010035202
+KE-010035215
+KE-010035216
+KE-010035217
+
## Custom Parser
KE-010042201
diff --git a/src/streaming/src/main/java/org/apache/kylin/kafka/util/KafkaUtils.java b/src/streaming/src/main/java/org/apache/kylin/kafka/util/KafkaUtils.java
index a0cd5905c4..26bb14d07d 100644
--- a/src/streaming/src/main/java/org/apache/kylin/kafka/util/KafkaUtils.java
+++ b/src/streaming/src/main/java/org/apache/kylin/kafka/util/KafkaUtils.java
@@ -67,8 +67,7 @@ public class KafkaUtils {
return getKafkaConsumer(brokers, groupId, new Properties());
}
- public static Consumer<String, ByteBuffer> getKafkaConsumer(String brokers, String groupId,
- Properties properties) {
+ public static Consumer<String, ByteBuffer> getKafkaConsumer(String brokers, String groupId, Properties properties) {
Properties props = getConsumerProperties(brokers, groupId, properties);
if (mockup != null) {
return mockup;
@@ -106,7 +105,7 @@ public class KafkaUtils {
props.putAll(KylinConfig.getInstanceFromEnv().getStreamingKafkaConfigOverride());
synchronized (kafkaJaasTextPair) {
if (Boolean.FALSE.equals(kafkaJaasTextPair.getFirst())) {
- kafkaJaasTextPair.setSecond(StreamingJobUtils.extractKafkaSaslJaasConf());
+ kafkaJaasTextPair.setSecond(StreamingJobUtils.extractKafkaJaasConf(true));
kafkaJaasTextPair.setFirst(true);
}
}
diff --git a/src/streaming/src/main/java/org/apache/kylin/streaming/constants/StreamingConstants.java b/src/streaming/src/main/java/org/apache/kylin/streaming/constants/StreamingConstants.java
index 1181f22c01..479c496e69 100644
--- a/src/streaming/src/main/java/org/apache/kylin/streaming/constants/StreamingConstants.java
+++ b/src/streaming/src/main/java/org/apache/kylin/streaming/constants/StreamingConstants.java
@@ -21,6 +21,9 @@ package org.apache.kylin.streaming.constants;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.metadata.HDFSMetadataStore;
+import lombok.experimental.UtilityClass;
+
+@UtilityClass
public class StreamingConstants {
// spark job conf
@@ -38,6 +41,9 @@ public class StreamingConstants {
public static final String SPARK_DRIVER_OVERHEAD = "spark.driver.memoryOverhead";
public static final String SPARK_DRIVER_OVERHEAD_DEFAULT = "1g";
+ public static final String SPARK_KERBEROS_KEYTAB = "spark.kerberos.keytab";
+ public static final String SPARK_KERBEROS_PRINCIPAL = "spark.kerberos.principal";
+
public static final String SPARK_YARN_DIST_JARS = "spark.yarn.dist.jars";
public static final String SPARK_DRIVER_OPTS = "spark.driver.extraJavaOptions";
public static final String SPARK_EXECUTOR_OPTS = "spark.executor.extraJavaOptions";
diff --git a/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/StreamingJobListener.java b/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/StreamingJobListener.java
index c0b295154b..d2070f7987 100644
--- a/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/StreamingJobListener.java
+++ b/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/StreamingJobListener.java
@@ -86,40 +86,32 @@ public class StreamingJobListener implements SparkAppHandle.Listener {
}
private boolean isFailed(SparkAppHandle.State state) {
- if (SparkAppHandle.State.FAILED == state || SparkAppHandle.State.KILLED == state
- || SparkAppHandle.State.LOST == state) {
- return true;
- }
- return false;
+ return SparkAppHandle.State.FAILED == state || SparkAppHandle.State.KILLED == state
+ || SparkAppHandle.State.LOST == state;
}
private boolean isFinished(SparkAppHandle.State state) {
- if (SparkAppHandle.State.FINISHED == state) {
- return true;
- }
- return false;
+ return SparkAppHandle.State.FINISHED == state;
}
@Override
public void infoChanged(SparkAppHandle handler) {
-
+ // just Override
}
@Subscribe
public void onStreamingJobKill(StreamingJobKillEvent streamingJobKillEvent) {
- val project = streamingJobKillEvent.getProject();
val modelId = streamingJobKillEvent.getModelId();
- StreamingScheduler scheduler = StreamingScheduler.getInstance(project);
+ StreamingScheduler scheduler = StreamingScheduler.getInstance(streamingJobKillEvent.getProject());
scheduler.killJob(modelId, JobTypeEnum.STREAMING_MERGE, JobStatusEnum.STOPPED);
scheduler.killJob(modelId, JobTypeEnum.STREAMING_BUILD, JobStatusEnum.STOPPED);
}
@Subscribe
public void onStreamingJobDrop(StreamingJobDropEvent streamingJobDropEvent) {
- val project = streamingJobDropEvent.getProject();
val modelId = streamingJobDropEvent.getModelId();
val config = KylinConfig.getInstanceFromEnv();
- val mgr = StreamingJobManager.getInstance(config, project);
+ val mgr = StreamingJobManager.getInstance(config, streamingJobDropEvent.getProject());
val buildJobId = StreamingUtils.getJobId(modelId, JobTypeEnum.STREAMING_BUILD.toString());
val mergeJobId = StreamingUtils.getJobId(modelId, JobTypeEnum.STREAMING_MERGE.toString());
mgr.deleteStreamingJob(buildJobId);
diff --git a/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/StreamingJobUtils.java b/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/StreamingJobUtils.java
index f79dd9bec9..7b6e29bd25 100644
--- a/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/StreamingJobUtils.java
+++ b/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/StreamingJobUtils.java
@@ -18,47 +18,53 @@
package org.apache.kylin.streaming.jobs;
-import static org.apache.kylin.common.exception.ServerErrorCode.READ_KAFKA_JAAS_FILE_ERROR;
+import static org.apache.commons.lang3.StringUtils.INDEX_NOT_FOUND;
+import static org.apache.kylin.common.exception.code.ErrorCodeServer.KAFKA_JAAS_FILE_KAFKACLIENT_NOT_EXISTS;
+import static org.apache.kylin.common.exception.code.ErrorCodeServer.KAFKA_JAAS_FILE_KEYTAB_NOT_EXISTS;
+import static org.apache.kylin.common.exception.code.ErrorCodeServer.READ_KAFKA_JAAS_FILE_ERROR;
import static org.apache.kylin.streaming.constants.StreamingConstants.STREAMING_CONFIG_PREFIX;
import static org.apache.kylin.streaming.constants.StreamingConstants.STREAMING_KAFKA_CONFIG_PREFIX;
import static org.apache.kylin.streaming.constants.StreamingConstants.STREAMING_TABLE_REFRESH_INTERVAL;
import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.security.JaasContext;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfigExt;
import org.apache.kylin.common.exception.KylinException;
-import org.apache.kylin.common.msg.MsgPicker;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.project.NProjectManager;
-import org.apache.log4j.Logger;
import com.google.common.collect.Maps;
+import lombok.SneakyThrows;
import lombok.val;
+import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
@Slf4j
+@UtilityClass
public class StreamingJobUtils {
- private static final Logger logger = Logger.getLogger(StreamingJobUtils.class);
/**
* kylin.properties config -> model config -> job config
- *
- * @return
*/
public static KylinConfig getStreamingKylinConfig(final KylinConfig originalConfig, Map<String, String> jobParams,
String modelId, String project) {
KylinConfigExt kylinConfigExt;
- val dataflowId = modelId;
- if (StringUtils.isNotBlank(dataflowId)) {
+ if (StringUtils.isNotBlank(modelId)) {
val dataflowManager = NDataflowManager.getInstance(originalConfig, project);
- kylinConfigExt = dataflowManager.getDataflow(dataflowId).getConfig();
+ kylinConfigExt = dataflowManager.getDataflow(modelId).getConfig();
} else {
val projectInstance = NProjectManager.getInstance(originalConfig).getProject(project);
kylinConfigExt = projectInstance.getConfig();
@@ -89,22 +95,121 @@ public class StreamingJobUtils {
return KylinConfigExt.createInstance(kylinConfigExt, streamingJobOverrides);
}
- public static String extractKafkaSaslJaasConf() {
+ public static String extractKafkaJaasConf(boolean useAbsKeyTabPath) {
val kapConfig = KapConfig.getInstanceFromEnv();
if (!kapConfig.isKafkaJaasEnabled()) {
return null;
}
- File file = new File(kapConfig.getKafkaJaasConfPath());
+ String jaasOriginText = extractJaasText();
+ if (StringUtils.isEmpty(jaasOriginText)) {
+ return null;
+ }
+ String jaasTextRewrite = rewriteJaasConf(jaasOriginText);
+ return rewriteKeyTab(jaasTextRewrite, useAbsKeyTabPath);
+ }
+
+ /**
+ * extract keytab abs path in kafka jaas
+ */
+ public static String getJaasKeyTabAbsPath() {
+ val kapConfig = KapConfig.getInstanceFromEnv();
+ if (!kapConfig.isKafkaJaasEnabled()) {
+ return null;
+ }
+ String jaasOriginText = extractJaasText();
+ if (StringUtils.isEmpty(jaasOriginText)) {
+ return null;
+ }
+ String jaasRewriteText = rewriteJaasConf(jaasOriginText);
+ String keyTabPath = getKeyTabPathFromJaas(jaasRewriteText);
+ if (StringUtils.isEmpty(keyTabPath)) {
+ return null;
+ }
+ return FileUtils.getFile(keyTabPath).getAbsolutePath();
+ }
+
+ @SneakyThrows
+ public static void createExecutorJaas() {
+ // extract origin kafka jaas file, rewrite keytab path if exists
+ // write it into {KYLIN_HOME}/hadoop_conf
+ val kapConfig = KapConfig.getInstanceFromEnv();
+ if (!kapConfig.isKafkaJaasEnabled()) {
+ return;
+ }
+ String jaasConf = extractKafkaJaasConf(false);
+ if (StringUtils.isEmpty(jaasConf)) {
+ return;
+ }
+ String jaasResultText = "KafkaClient { " + jaasConf + " };";
+ String jaasPath = getExecutorJaasPath();
+ File executorJaasConfFile = FileUtils.getFile(jaasPath);
+ FileUtils.write(executorJaasConfFile, jaasResultText, StandardCharsets.UTF_8, false);
+ log.info("extract kafka jaas file to {}", jaasPath);
+ }
+
+ public static String getExecutorJaasPath() {
+ // {KYLIN_HOME}/hadoop_conf/kafka_jaas.conf
+ return HadoopUtil.getHadoopConfDir() + File.separator + getExecutorJaasName();
+ }
+
+ public static String getExecutorJaasName() {
+ return KapConfig.getInstanceFromEnv().getKafkaJaasConf();
+ }
+
+ /**
+ * read kafka jaas conf
+ */
+ private static String extractJaasText() {
+ val kapConfig = KapConfig.getInstanceFromEnv();
+ File jaasFile = new File(kapConfig.getKafkaJaasConfPath());
+ String jaasOriginText;
try {
- val text = FileUtils.readFileToString(file);
- int kafkaClientIdx = text.indexOf("KafkaClient");
- if (StringUtils.isNotEmpty(text) && kafkaClientIdx != -1) {
- return text.substring(text.indexOf("{") + 1, text.indexOf("}")).trim();
- }
- } catch (Exception e) {
- logger.error("read kafka jaas file error ", e);
+ jaasOriginText = FileUtils.readFileToString(jaasFile, StandardCharsets.UTF_8);
+ } catch (IOException e) {
+ throw new KylinException(READ_KAFKA_JAAS_FILE_ERROR, e);
+ }
+ if (StringUtils.indexOf(jaasOriginText, "KafkaClient") == INDEX_NOT_FOUND) {
+ throw new KylinException(KAFKA_JAAS_FILE_KAFKACLIENT_NOT_EXISTS);
+ }
+ return jaasOriginText;
+ }
+
+ /**
+ * input: KafkaClient { *****; };
+ * output: *****;
+ */
+ private static String rewriteJaasConf(String jaasText) {
+ int start = StringUtils.indexOf(jaasText, '{') + 1;
+ int end = StringUtils.indexOf(jaasText, '}');
+ return StringUtils.substring(jaasText, start, end).trim();
+ }
+
+ private static String rewriteKeyTab(String jaasText, boolean useAbsKeyTabPath) {
+ String keyTabPath = getKeyTabPathFromJaas(jaasText);
+ if (StringUtils.isEmpty(keyTabPath)) {
+ return jaasText;
+ }
+ File keyTabFile = FileUtils.getFile(keyTabPath);
+ String replacement = keyTabFile.getName();
+ if (useAbsKeyTabPath) {
+ replacement = keyTabFile.getAbsolutePath();
+ }
+ log.info("kafka jaas replace {} -> {}", keyTabPath, replacement);
+ return StringUtils.replace(jaasText, keyTabPath, replacement);
+ }
+
+ public static String getKeyTabPathFromJaas(String jaasStr) {
+ Map<String, Password> map = Maps.newHashMap();
+ map.put(SaslConfigs.SASL_JAAS_CONFIG, new Password(jaasStr));
+ val configEntry = JaasContext.loadClientContext(map).configurationEntries().get(0);
+ String keyTabPath = (String) configEntry.getOptions().getOrDefault("keyTab", null);
+ if (StringUtils.isEmpty(keyTabPath)) {
+ return null;
+ }
+ if (!FileUtils.getFile(keyTabPath).exists()) {
+ throw new KylinException(KAFKA_JAAS_FILE_KEYTAB_NOT_EXISTS);
}
- throw new KylinException(READ_KAFKA_JAAS_FILE_ERROR, MsgPicker.getMsg().getReadKafkaJaasFileError());
+ return keyTabPath;
}
}
diff --git a/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncher.java b/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncher.java
index 499e7c92c3..d748f0b4fb 100644
--- a/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncher.java
+++ b/src/streaming/src/main/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncher.java
@@ -34,6 +34,8 @@ import static org.apache.kylin.streaming.constants.StreamingConstants.SPARK_EXEC
import static org.apache.kylin.streaming.constants.StreamingConstants.SPARK_EXECUTOR_MEM;
import static org.apache.kylin.streaming.constants.StreamingConstants.SPARK_EXECUTOR_MEM_DEFAULT;
import static org.apache.kylin.streaming.constants.StreamingConstants.SPARK_EXECUTOR_OPTS;
+import static org.apache.kylin.streaming.constants.StreamingConstants.SPARK_KERBEROS_KEYTAB;
+import static org.apache.kylin.streaming.constants.StreamingConstants.SPARK_KERBEROS_PRINCIPAL;
import static org.apache.kylin.streaming.constants.StreamingConstants.SPARK_MASTER;
import static org.apache.kylin.streaming.constants.StreamingConstants.SPARK_MASTER_DEFAULT;
import static org.apache.kylin.streaming.constants.StreamingConstants.SPARK_SHUFFLE_PARTITIONS;
@@ -64,7 +66,7 @@ import java.util.stream.Collectors;
import javax.annotation.Nullable;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
@@ -108,6 +110,7 @@ import lombok.extern.slf4j.Slf4j;
public class StreamingJobLauncher extends AbstractSparkJobLauncher {
private static final String KRB5CONF_PROPS = "java.security.krb5.conf";
private static final String JAASCONF_PROPS = "java.security.auth.login.config";
+ private static final String HADOOP_CONF_PATH = "./__spark_conf__/__hadoop_conf__/";
private Map<String, String> jobParams;
private String mainClazz;
private String[] appArgs;
@@ -138,6 +141,8 @@ public class StreamingJobLauncher extends AbstractSparkJobLauncher {
jobParams.getOrDefault(STREAMING_DURATION, STREAMING_DURATION_DEFAULT),
jobParams.getOrDefault(STREAMING_WATERMARK, STREAMING_WATERMARK_DEFAULT),
distMetaStorageUrl.toString() };
+ // build job extract and create kafka jaas file
+ StreamingJobUtils.createExecutorJaas();
break;
}
case STREAMING_MERGE: {
@@ -267,32 +272,6 @@ public class StreamingJobLauncher extends AbstractSparkJobLauncher {
}
}
- private String wrapDriverJavaOptions(Map<String, String> sparkConf) {
- val driverJavaOptsConfigStr = sparkConf.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS);
-
- Preconditions.checkNotNull(driverJavaOptsConfigStr, SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS + " is empty");
- StringBuilder driverJavaOptionsSB = new StringBuilder(driverJavaOptsConfigStr);
- val kapConfig = KapConfig.getInstanceFromEnv();
- if (kapConfig.isKerberosEnabled() && !driverJavaOptsConfigStr.contains(KRB5CONF_PROPS)) {
- val krb5conf = " -Djava.security.krb5.conf=" + kapConfig.getKerberosKrb5ConfPath();
- driverJavaOptionsSB.append(krb5conf);
- }
- if (kapConfig.isKafkaJaasEnabled() && !driverJavaOptsConfigStr.contains(JAASCONF_PROPS)) {
- val jaasConf = " -Djava.security.auth.login.config=" + kapConfig.getKafkaJaasConfPath();
- driverJavaOptionsSB.append(jaasConf);
- }
- driverJavaOptionsSB.append(javaPropertyFormatter(REST_SERVER_IP, AddressUtil.getLocalHostExactAddress()));
- driverJavaOptionsSB.append(javaPropertyFormatter("kylin.hdfs.working.dir", config.getHdfsWorkingDirectory()));
- driverJavaOptionsSB
- .append(javaPropertyFormatter("spark.driver.log4j.appender.hdfs.File", getDriverHDFSLogPath()));
- driverJavaOptionsSB.append(javaPropertyFormatter("user.timezone", config.getTimeZone()));
-
- final String driverLog4jXmlFile = config.getLogSparkStreamingDriverPropertiesFile();
- generateLog4jConfiguration(false, driverJavaOptionsSB, driverLog4jXmlFile);
-
- return driverJavaOptionsSB.toString();
- }
-
private void generateLog4jConfiguration(boolean isExecutor, StringBuilder log4jJavaOptionsSB, String log4jXmlFile) {
String log4jConfigStr = "file:" + log4jXmlFile;
@@ -304,47 +283,74 @@ public class StreamingJobLauncher extends AbstractSparkJobLauncher {
log4jJavaOptionsSB.append(javaPropertyFormatter("log4j.configurationFile", log4jConfigStr));
}
- private String wrapExecutorJavaOptions(Map<String, String> sparkConf) {
- val executorJavaOptsConfigStr = sparkConf.get(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS);
+ private String wrapDriverJavaOptions(Map<String, String> sparkConf) {
+ val existOptStr = sparkConf.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS);
+ Preconditions.checkNotNull(existOptStr, SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS + " is empty");
+
+ StringBuilder driverJavaOptSB = new StringBuilder(existOptStr);
+ val kapConfig = KapConfig.getInstanceFromEnv();
+ rewriteKrb5Conf(driverJavaOptSB, existOptStr, kapConfig.getKerberosKrb5ConfPath());
+ // client driver kafka_jaas use local file
+ // cluster driver use remote kafka_jaas file
+ rewriteKafkaJaasConf(driverJavaOptSB, existOptStr, kapConfig.getKafkaJaasConfPath());
+ driverJavaOptSB.append(javaPropertyFormatter(REST_SERVER_IP, AddressUtil.getLocalHostExactAddress()));
+ driverJavaOptSB.append(javaPropertyFormatter("kylin.hdfs.working.dir", config.getHdfsWorkingDirectory()));
+ driverJavaOptSB.append(javaPropertyFormatter("spark.driver.log4j.appender.hdfs.File", getDriverHDFSLogPath()));
+ driverJavaOptSB.append(javaPropertyFormatter("user.timezone", config.getTimeZone()));
- Preconditions.checkNotNull(executorJavaOptsConfigStr, SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS + " is empty");
+ final String driverLog4jXmlFile = config.getLogSparkStreamingDriverPropertiesFile();
+ generateLog4jConfiguration(false, driverJavaOptSB, driverLog4jXmlFile);
+ return driverJavaOptSB.toString();
+ }
+
+ private String wrapExecutorJavaOptions(Map<String, String> sparkConf) {
+ val existOptionsStr = sparkConf.get(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS);
+ Preconditions.checkNotNull(existOptionsStr, SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS + " is empty");
- StringBuilder executorJavaOptionsSB = new StringBuilder(executorJavaOptsConfigStr);
val kapConfig = KapConfig.getInstanceFromEnv();
- if (kapConfig.isKerberosEnabled() && !executorJavaOptsConfigStr.contains(KRB5CONF_PROPS)) {
- val krb5Conf = " -Djava.security.krb5.conf=./__spark_conf__/__hadoop_conf__/"
- + kapConfig.getKerberosKrb5Conf();
- executorJavaOptionsSB.append(krb5Conf);
- }
- if (kapConfig.isKafkaJaasEnabled() && !executorJavaOptsConfigStr.contains(JAASCONF_PROPS)) {
- val jaasConf = " -Djava.security.auth.login.config=./" + kapConfig.getKafkaJaasConf();
- executorJavaOptionsSB.append(jaasConf);
- }
- executorJavaOptionsSB.append(javaPropertyFormatter("kap.spark.identifier", jobId));
- executorJavaOptionsSB.append(javaPropertyFormatter("kap.spark.jobTimeStamp", currentTimestamp.toString()));
- executorJavaOptionsSB.append(javaPropertyFormatter("kap.spark.project", project));
- executorJavaOptionsSB.append(javaPropertyFormatter("user.timezone", config.getTimeZone()));
+ StringBuilder executorJavaOptSB = new StringBuilder(existOptionsStr);
+ rewriteKrb5Conf(executorJavaOptSB, existOptionsStr, HADOOP_CONF_PATH + kapConfig.getKerberosKrb5Conf());
+ // executor always use remote kafka_jaas file
+ rewriteKafkaJaasConf(executorJavaOptSB, existOptionsStr,
+ HADOOP_CONF_PATH + StreamingJobUtils.getExecutorJaasName());
+ executorJavaOptSB.append(javaPropertyFormatter("kap.spark.identifier", jobId));
+ executorJavaOptSB.append(javaPropertyFormatter("kap.spark.jobTimeStamp", currentTimestamp.toString()));
+ executorJavaOptSB.append(javaPropertyFormatter("kap.spark.project", project));
+ executorJavaOptSB.append(javaPropertyFormatter("user.timezone", config.getTimeZone()));
if (StringUtils.isNotBlank(config.getMountSparkLogDir())) {
- executorJavaOptionsSB.append(javaPropertyFormatter("job.mountDir", config.getMountSparkLogDir()));
+ executorJavaOptSB.append(javaPropertyFormatter("job.mountDir", config.getMountSparkLogDir()));
}
final String executorLog4jXmlFile = config.getLogSparkStreamingExecutorPropertiesFile();
- generateLog4jConfiguration(true, executorJavaOptionsSB, executorLog4jXmlFile);
-
- return executorJavaOptionsSB.toString();
+ generateLog4jConfiguration(true, executorJavaOptSB, executorLog4jXmlFile);
+ return executorJavaOptSB.toString();
}
private String wrapYarnAmJavaOptions(Map<String, String> sparkConf) {
- val yarnAmJavaOptsConfigStr = sparkConf.getOrDefault(SPARK_YARN_AM_OPTS, "");
+ val existOptStr = sparkConf.getOrDefault(SPARK_YARN_AM_OPTS, "");
+ val kapConfig = KapConfig.getInstanceFromEnv();
+ StringBuilder yarnAmJavaOptSB = new StringBuilder(existOptStr);
+ rewriteKrb5Conf(yarnAmJavaOptSB, existOptStr, HADOOP_CONF_PATH + kapConfig.getKerberosKrb5Conf());
+ return yarnAmJavaOptSB.toString();
+ }
+
+ private void rewriteKafkaJaasConf(StringBuilder sb, String existOptStr, String value) {
+ KapConfig kapConfig = KapConfig.getInstanceFromEnv();
+ if (!kapConfig.isKafkaJaasEnabled() || !jobType.equals(JobTypeEnum.STREAMING_BUILD)
+ || existOptStr.contains(JAASCONF_PROPS)) {
+ return;
+ }
+ String jaasConf = javaPropertyFormatter(JAASCONF_PROPS, value);
+ sb.append(jaasConf);
+ }
- StringBuilder yarnAmJavaOptionsSB = new StringBuilder(yarnAmJavaOptsConfigStr);
+ private void rewriteKrb5Conf(StringBuilder sb, String existConfStr, String value) {
val kapConfig = KapConfig.getInstanceFromEnv();
- if (kapConfig.isKerberosEnabled() && !yarnAmJavaOptsConfigStr.contains(KRB5CONF_PROPS)) {
- val krb5Conf = " -Djava.security.krb5.conf=./__spark_conf__/__hadoop_conf__/"
- + kapConfig.getKerberosKrb5Conf();
- yarnAmJavaOptionsSB.append(krb5Conf);
+ if (!kapConfig.isKerberosEnabled() || existConfStr.contains(KRB5CONF_PROPS)) {
+ return;
}
- return yarnAmJavaOptionsSB.toString();
+ val krb5Conf = javaPropertyFormatter(KRB5CONF_PROPS, value);
+ sb.append(krb5Conf);
}
private void addParserJar(SparkLauncher sparkLauncher) {
@@ -361,18 +367,22 @@ public class StreamingJobLauncher extends AbstractSparkJobLauncher {
Map<String, String> sparkConf = getStreamingSparkConfig(config);
sparkConf.forEach((key, value) -> launcher.setConf(key, value));
- val numberOfExecutor = sparkConf.getOrDefault(SPARK_EXECUTOR_INSTANCES, SPARK_EXECUTOR_INSTANCES_DEFAULT);
- val numberOfCore = sparkConf.getOrDefault(SPARK_EXECUTOR_CORES, SPARK_EXECUTOR_CORES_DEFAULT);
val sparkLauncher = launcher.setAppName(jobId).setSparkHome(KylinConfig.getSparkHome());
val kapConfig = KapConfig.getInstanceFromEnv();
if (kapConfig.isKerberosEnabled()) {
- sparkLauncher.setConf("spark.kerberos.keytab", kapConfig.getKerberosKeytabPath());
- sparkLauncher.setConf("spark.kerberos.principal", kapConfig.getKerberosPrincipal());
+ sparkLauncher.setConf(SPARK_KERBEROS_KEYTAB, kapConfig.getKerberosKeytabPath());
+ sparkLauncher.setConf(SPARK_KERBEROS_PRINCIPAL, kapConfig.getKerberosPrincipal());
}
- if (kapConfig.isKafkaJaasEnabled()) {
- sparkLauncher.addFile(kapConfig.getKafkaJaasConfPath());
+ if (kapConfig.isKafkaJaasEnabled() && jobType.equals(JobTypeEnum.STREAMING_BUILD)) {
+ String keyTabAbsPath = StreamingJobUtils.getJaasKeyTabAbsPath();
+ if (StringUtils.isNotEmpty(keyTabAbsPath)) {
+ // upload keytab in kafka jaas
+ sparkLauncher.addFile(keyTabAbsPath);
+ }
}
addParserJar(sparkLauncher);
+ val numberOfExecutor = sparkConf.getOrDefault(SPARK_EXECUTOR_INSTANCES, SPARK_EXECUTOR_INSTANCES_DEFAULT);
+ val numberOfCore = sparkConf.getOrDefault(SPARK_EXECUTOR_CORES, SPARK_EXECUTOR_CORES_DEFAULT);
sparkLauncher.setMaster(sparkConf.getOrDefault(SPARK_MASTER, SPARK_MASTER_DEFAULT))
.setConf(SPARK_DRIVER_MEM, sparkConf.getOrDefault(SPARK_DRIVER_MEM, SPARK_DRIVER_MEM_DEFAULT))
.setConf(SPARK_EXECUTOR_INSTANCES, numberOfExecutor).setConf(SPARK_EXECUTOR_CORES, numberOfCore)
diff --git a/src/streaming/src/main/scala/org/apache/kylin/streaming/CreateStreamingFlatTable.scala b/src/streaming/src/main/scala/org/apache/kylin/streaming/CreateStreamingFlatTable.scala
index 80653ed87c..13fb54a288 100644
--- a/src/streaming/src/main/scala/org/apache/kylin/streaming/CreateStreamingFlatTable.scala
+++ b/src/streaming/src/main/scala/org/apache/kylin/streaming/CreateStreamingFlatTable.scala
@@ -71,9 +71,6 @@ class CreateStreamingFlatTable(entry: CreateFlatTableEntry) extends
kafkaJobParams.remove(SASL_MECHANISM);
kafkaJobParams.put("kafka." + SASL_MECHANISM, saslMechanism.get)
}
- val text = StreamingJobUtils.extractKafkaSaslJaasConf
- if (StringUtils.isNotEmpty(text)) kafkaJobParams.put(SaslConfigs.SASL_JAAS_CONFIG, text)
-
kafkaJobParams.foreach { param =>
param._1 match {
case MAX_OFFSETS_PER_TRIGGER => if (param._2.toInt > 0) {
diff --git a/src/streaming/src/test/java/org/apache/kylin/kafka/util/KafkaUtilsTest.java b/src/streaming/src/test/java/org/apache/kylin/kafka/util/KafkaUtilsTest.java
index 13514fca7e..8f86a53c31 100644
--- a/src/streaming/src/test/java/org/apache/kylin/kafka/util/KafkaUtilsTest.java
+++ b/src/streaming/src/test/java/org/apache/kylin/kafka/util/KafkaUtilsTest.java
@@ -20,6 +20,7 @@ package org.apache.kylin.kafka.util;
import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
import java.io.File;
+import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Properties;
@@ -109,8 +110,9 @@ public class KafkaUtilsTest extends StreamingTestCase {
val kapConfig = KapConfig.getInstanceFromEnv();
FileUtils.write(new File(kapConfig.getKafkaJaasConfPath()),
- "KafkaClient{ org.apache.kafka.common.security.scram.ScramLoginModule required}");
- val text = StreamingJobUtils.extractKafkaSaslJaasConf();
+ "KafkaClient{ org.apache.kafka.common.security.scram.ScramLoginModule required;};",
+ StandardCharsets.UTF_8);
+ val text = StreamingJobUtils.extractKafkaJaasConf(true);
Assert.assertNull(text);
Pair<Boolean, String> kafkaJaasTextPair = (Pair<Boolean, String>) ReflectionUtils.getField(KafkaUtils.class,
"kafkaJaasTextPair");
diff --git a/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/StreamingJobListenerTest.java b/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/StreamingJobListenerTest.java
index bb419f7510..5d0ce9de37 100644
--- a/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/StreamingJobListenerTest.java
+++ b/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/StreamingJobListenerTest.java
@@ -27,10 +27,10 @@ import java.util.Optional;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.scheduler.EventBusFactory;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.execution.JobTypeEnum;
-import org.apache.kylin.common.scheduler.EventBusFactory;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.streaming.event.StreamingJobDropEvent;
import org.apache.kylin.streaming.event.StreamingJobKillEvent;
@@ -52,15 +52,15 @@ import lombok.var;
public class StreamingJobListenerTest extends StreamingTestCase {
- private static String PROJECT = "streaming_test";
- private static String MODEL_ID = "e78a89dd-847f-4574-8afa-8768b4228b72";
+ private static final String PROJECT = "streaming_test";
+ private static final String MODEL_ID = "e78a89dd-847f-4574-8afa-8768b4228b72";
@Rule
public ExpectedException thrown = ExpectedException.none();
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Rule
public TestName testName = new TestName();
- private StreamingJobListener eventListener = new StreamingJobListener();
+ private final StreamingJobListener eventListener = new StreamingJobListener();
@Before
public void setUp() throws Exception {
@@ -81,9 +81,7 @@ public class StreamingJobListenerTest extends StreamingTestCase {
val listener = new StreamingJobListener(PROJECT, jobId);
val testConfig = getTestConfig();
var mgr = StreamingJobManager.getInstance(testConfig, PROJECT);
- mgr.updateStreamingJob(jobId, copyForWrite -> {
- copyForWrite.setSkipListener(true);
- });
+ mgr.updateStreamingJob(jobId, copyForWrite -> copyForWrite.setSkipListener(true));
listener.stateChanged(mockRunningState());
var jobMeta = mgr.getStreamingJobByUuid(jobId);
Assert.assertEquals(JobStatusEnum.RUNNING, jobMeta.getCurrentStatus());
@@ -95,9 +93,7 @@ public class StreamingJobListenerTest extends StreamingTestCase {
val jobId = StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_BUILD.toString());
val testConfig = getTestConfig();
var mgr = StreamingJobManager.getInstance(testConfig, PROJECT);
- mgr.updateStreamingJob(jobId, copyForWrite -> {
- copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING);
- });
+ mgr.updateStreamingJob(jobId, copyForWrite -> copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING));
val listener = new StreamingJobListener(PROJECT, jobId);
listener.stateChanged(mockFailedState());
var jobMeta = mgr.getStreamingJobByUuid(jobId);
@@ -128,9 +124,7 @@ public class StreamingJobListenerTest extends StreamingTestCase {
val jobId = StreamingUtils.getJobId(MODEL_ID, JobTypeEnum.STREAMING_BUILD.toString());
val testConfig = getTestConfig();
var mgr = StreamingJobManager.getInstance(testConfig, PROJECT);
- mgr.updateStreamingJob(jobId, copyForWrite -> {
- copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING);
- });
+ mgr.updateStreamingJob(jobId, copyForWrite -> copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING));
val listener = new StreamingJobListener(PROJECT, jobId);
listener.stateChanged(mockKilledState());
var jobMeta = mgr.getStreamingJobByUuid(jobId);
@@ -172,12 +166,8 @@ public class StreamingJobListenerTest extends StreamingTestCase {
var mgr = StreamingJobManager.getInstance(config, project);
val buildJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_build";
val mergeJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_merge";
- mgr.updateStreamingJob(buildJobId, copyForWrite -> {
- copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING);
- });
- mgr.updateStreamingJob(mergeJobId, copyForWrite -> {
- copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING);
- });
+ mgr.updateStreamingJob(buildJobId, copyForWrite -> copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING));
+ mgr.updateStreamingJob(mergeJobId, copyForWrite -> copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING));
var buildJobMeta = mgr.getStreamingJobByUuid(buildJobId);
var mergeJobMeta = mgr.getStreamingJobByUuid(mergeJobId);
Assert.assertEquals(JobStatusEnum.RUNNING, buildJobMeta.getCurrentStatus());
@@ -308,7 +298,7 @@ public class StreamingJobListenerTest extends StreamingTestCase {
@Override
public Optional<Throwable> getError() {
- return null;
+ return Optional.empty();
}
- };
+ }
}
diff --git a/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/StreamingJobUtilsTest.java b/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/StreamingJobUtilsTest.java
index 6c27a82cc4..8a92cdc62b 100644
--- a/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/StreamingJobUtilsTest.java
+++ b/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/StreamingJobUtilsTest.java
@@ -17,27 +17,29 @@
*/
package org.apache.kylin.streaming.jobs;
+import static org.apache.kylin.common.exception.code.ErrorCodeServer.KAFKA_JAAS_FILE_KEYTAB_NOT_EXISTS;
+import static org.apache.kylin.common.exception.code.ErrorCodeServer.READ_KAFKA_JAAS_FILE_ERROR;
+
import java.io.File;
+import java.nio.charset.StandardCharsets;
import org.apache.commons.io.FileUtils;
import org.apache.kylin.common.KapConfig;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.KylinException;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.streaming.manager.StreamingJobManager;
import org.apache.kylin.streaming.util.StreamingTestCase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import lombok.val;
public class StreamingJobUtilsTest extends StreamingTestCase {
private static final String PROJECT = "streaming_test";
private static final String DATAFLOW_ID = "e78a89dd-847f-4574-8afa-8768b4228b73";
- @Rule
- public ExpectedException thrown = ExpectedException.none();
@Before
public void setUp() throws Exception {
@@ -88,24 +90,100 @@ public class StreamingJobUtilsTest extends StreamingTestCase {
@Test
public void testExtractKafkaSaslJaasConf() throws Exception {
val kapConfig = KapConfig.getInstanceFromEnv();
- Assert.assertNull(StreamingJobUtils.extractKafkaSaslJaasConf());
+ Assert.assertNull(StreamingJobUtils.extractKafkaJaasConf(true));
getTestConfig().setProperty("kylin.kafka-jaas.enabled", "true");
FileUtils.write(new File(kapConfig.getKafkaJaasConfPath()),
- "KafkaClient{ org.apache.kafka.common.security.scram.ScramLoginModule required}");
- val text = StreamingJobUtils.extractKafkaSaslJaasConf();
+ "KafkaClient{ org.apache.kafka.common.security.scram.ScramLoginModule required;}",
+ StandardCharsets.UTF_8);
+ val text = StreamingJobUtils.extractKafkaJaasConf(true);
Assert.assertNotNull(text);
getTestConfig().setProperty("kylin.kafka-jaas-conf", "kafka_err_jaas.conf");
File file = new File(kapConfig.getKafkaJaasConfPath());
- FileUtils.write(file, "}4{");
+ FileUtils.write(file, "}4{", StandardCharsets.UTF_8);
try {
- StreamingJobUtils.extractKafkaSaslJaasConf();
+ StreamingJobUtils.extractKafkaJaasConf(true);
} catch (Exception e) {
Assert.assertTrue(e instanceof KylinException);
- Assert.assertEquals("KE-010035015", ((KylinException) e).getErrorCode().getCodeString());
+ Assert.assertEquals("KE-010035217", ((KylinException) e).getErrorCode().getCodeString());
} finally {
FileUtils.deleteQuietly(new File(KapConfig.getInstanceFromEnv().getKafkaJaasConfPath()));
}
}
+
+ @Test
+ public void testCheckKeyTabFileUnderJaas() throws Exception {
+ val kapConfig = KapConfig.getInstanceFromEnv();
+ Assert.assertNull(StreamingJobUtils.extractKafkaJaasConf(true));
+ Assert.assertNull(StreamingJobUtils.getJaasKeyTabAbsPath());
+ KylinConfig kylinConfig = getTestConfig();
+ kylinConfig.setProperty("kylin.kafka-jaas.enabled", "true");
+ File testKeyTab = new File(KylinConfig.getKylinConfDir() + File.separator + "test.keytab");
+
+ // jaas not exist
+ Assert.assertThrows(READ_KAFKA_JAAS_FILE_ERROR.getMsg(), KylinException.class,
+ () -> StreamingJobUtils.extractKafkaJaasConf(true));
+
+ // jaas keytab key not exist
+ FileUtils.write(new File(kapConfig.getKafkaJaasConfPath()),
+ "KafkaClient { " + "com.sun.security.auth.module.Krb5LoginModule required " + "useKeyTab=true "
+ + "storeKey=true " + "principal=\"kylin@DEV.COM\" " + "serviceName=\"kafka\";" + " };",
+ StandardCharsets.UTF_8);
+ Assert.assertNull(StreamingJobUtils.getJaasKeyTabAbsPath());
+
+ // jaas exist but keytab not exist
+ FileUtils.write(new File(kapConfig.getKafkaJaasConfPath()),
+ "KafkaClient { " + "com.sun.security.auth.module.Krb5LoginModule required " + "useKeyTab=true "
+ + "storeKey=true " + "keyTab=\"" + testKeyTab + "\" " + "principal=\"kylin@DEV.COM\" "
+ + "serviceName=\"kafka\";" + " };",
+ StandardCharsets.UTF_8);
+ Assert.assertThrows(KAFKA_JAAS_FILE_KEYTAB_NOT_EXISTS.getMsg(), KylinException.class,
+ () -> StreamingJobUtils.extractKafkaJaasConf(true));
+
+ // all exist
+ FileUtils.write(testKeyTab, "test", StandardCharsets.UTF_8);
+ val text = StreamingJobUtils.extractKafkaJaasConf(true);
+ Assert.assertNotNull(text);
+ String keyTabAbsPath = StreamingJobUtils.getJaasKeyTabAbsPath();
+ Assert.assertEquals(testKeyTab.getAbsolutePath(), keyTabAbsPath);
+ String executorJaasName = StreamingJobUtils.getExecutorJaasName();
+ Assert.assertEquals(kapConfig.getKafkaJaasConf(), executorJaasName);
+ String executorJaasPath = StreamingJobUtils.getExecutorJaasPath();
+ Assert.assertEquals(HadoopUtil.getHadoopConfDir() + File.separator + executorJaasName, executorJaasPath);
+ kylinConfig.setProperty("kylin.kafka-jaas-conf", "kafka_err_jaas.conf");
+ }
+
+ @Test
+ public void testCreateExecutorJaas() throws Exception {
+ KapConfig kapConfig = KapConfig.getInstanceFromEnv();
+ String executorJaasPath = HadoopUtil.getHadoopConfDir() + File.separator + kapConfig.getKafkaJaasConf();
+ File executorJaasFile = new File(executorJaasPath);
+ executorJaasFile.deleteOnExit();
+ StreamingJobUtils.createExecutorJaas();
+ Assert.assertFalse(executorJaasFile.exists());
+ getTestConfig().setProperty("kylin.kafka-jaas.enabled", "true");
+
+ {
+ String jaasContext = "KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required; };";
+ FileUtils.write(new File(kapConfig.getKafkaJaasConfPath()), jaasContext, StandardCharsets.UTF_8);
+ StreamingJobUtils.createExecutorJaas();
+ Assert.assertTrue(executorJaasFile.exists());
+ Assert.assertEquals(jaasContext, FileUtils.readFileToString(executorJaasFile, StandardCharsets.UTF_8));
+ }
+
+ {
+ File testKeyTab = new File(KylinConfig.getKylinConfDir() + File.separator + "test.keytab");
+ FileUtils.write(testKeyTab, "test", StandardCharsets.UTF_8);
+ String jaasContext = "KafkaClient { " + "com.sun.security.auth.module.Krb5LoginModule required "
+ + "useKeyTab=true " + "storeKey=true " + "keyTab=\"" + testKeyTab.getAbsolutePath() + "\" "
+ + "principal=\"kylin@DEV.COM\" " + "serviceName=\"kafka\";" + " };";
+ FileUtils.write(new File(kapConfig.getKafkaJaasConfPath()), jaasContext, StandardCharsets.UTF_8);
+ StreamingJobUtils.createExecutorJaas();
+ Assert.assertTrue(executorJaasFile.exists());
+ Assert.assertEquals(jaasContext.replace(testKeyTab.getAbsolutePath(), testKeyTab.getName()),
+ FileUtils.readFileToString(executorJaasFile, StandardCharsets.UTF_8));
+ }
+ executorJaasFile.deleteOnExit();
+ }
}
diff --git a/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncherTest.java b/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncherTest.java
index 43251502dd..11640895c5 100644
--- a/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncherTest.java
+++ b/src/streaming/src/test/java/org/apache/kylin/streaming/jobs/impl/StreamingJobLauncherTest.java
@@ -22,6 +22,7 @@ import static org.apache.kylin.streaming.constants.StreamingConstants.DEFAULT_PA
import java.io.File;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -186,6 +187,9 @@ public class StreamingJobLauncherTest extends NLocalFileMetadataTestCase {
"-Djava.security.krb5.conf=./krb5.conf -Djava.security.auth.login.config=./kafka_jaas.conf");
val mockup = new MockupSparkLauncher();
ReflectionUtils.setField(launcher, "launcher", mockup);
+ FileUtils.write(new File(kapConfig.getKafkaJaasConfPath()),
+ "KafkaClient{ org.apache.kafka.common.security.scram.ScramLoginModule required;}",
+ StandardCharsets.UTF_8);
launcher.startYarnJob();
Assert.assertNotNull(mockup.sparkConf.get("spark.driver.extraJavaOptions"));
Assert.assertNotNull(mockup.sparkConf.get("spark.executor.extraJavaOptions"));
@@ -205,14 +209,15 @@ public class StreamingJobLauncherTest extends NLocalFileMetadataTestCase {
val kapConfig = KapConfig.getInstanceFromEnv();
config.setProperty("kylin.kafka-jaas.enabled", "true");
FileUtils.write(new File(kapConfig.getKafkaJaasConfPath()),
- "KafkaClient{ org.apache.kafka.common.security.scram.ScramLoginModule required}");
+ "KafkaClient{ org.apache.kafka.common.security.scram.ScramLoginModule required}",
+ StandardCharsets.UTF_8);
val mockup = new MockupSparkLauncher();
ReflectionUtils.setField(launcher, "launcher", mockup);
launcher.startYarnJob();
Assert.assertNotNull(mockup.sparkConf.get("spark.kerberos.keytab"));
Assert.assertNotNull(mockup.sparkConf.get("spark.kerberos.principal"));
- Assert.assertTrue(mockup.files.contains(kapConfig.getKafkaJaasConfPath()));
+ Assert.assertFalse(mockup.files.contains(kapConfig.getKafkaJaasConfPath()));
} finally {
FileUtils.deleteQuietly(new File(KapConfig.getInstanceFromEnv().getKafkaJaasConfPath()));
}
@@ -495,6 +500,30 @@ public class StreamingJobLauncherTest extends NLocalFileMetadataTestCase {
Assert.assertTrue(mockup.jars.contains("default"));
}
+ @Test
+ public void testStartYarnBuildJobWithoutExtraOpts() throws Exception {
+ val config = getTestConfig();
+ val modelId = "e78a89dd-847f-4574-8afa-8768b4228b72";
+
+ val launcher = new StreamingJobLauncher();
+ launcher.init(PROJECT, modelId, JobTypeEnum.STREAMING_BUILD);
+ config.setProperty("kylin.kerberos.enabled", "true");
+ config.setProperty("kylin.tool.mount-spark-log-dir", ".");
+ val kapConfig = KapConfig.getInstanceFromEnv();
+
+ config.setProperty("kylin.kerberos.enabled", "true");
+ config.setProperty("kylin.kafka-jaas.enabled", "true");
+ val mockup = new MockupSparkLauncher();
+ ReflectionUtils.setField(launcher, "launcher", mockup);
+ FileUtils.write(new File(kapConfig.getKafkaJaasConfPath()),
+ "KafkaClient{ org.apache.kafka.common.security.scram.ScramLoginModule required;}",
+ StandardCharsets.UTF_8);
+ launcher.startYarnJob();
+ Assert.assertNotNull(mockup.sparkConf.get("spark.driver.extraJavaOptions"));
+ Assert.assertNotNull(mockup.sparkConf.get("spark.executor.extraJavaOptions"));
+ Assert.assertNotNull(mockup.sparkConf.get("spark.yarn.am.extraJavaOptions"));
+ }
+
static class MockupSparkLauncher extends SparkLauncher {
private Map<String, String> sparkConf;
private List<String> files;