You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2023/09/10 13:36:22 UTC
[incubator-streampark] 01/01: [Improve] EnvUtils minor improvement
This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch env
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 25475f344d9df485bd3c63355ac86b16946e4519
Author: benjobs <be...@apache.org>
AuthorDate: Sun Sep 10 21:36:05 2023 +0800
[Improve] EnvUtils minor improvement
---
.../apache/streampark/common/util/EnvUtils.java | 41 ----------------------
.../streampark/common/util/HadoopConfigUtils.scala | 4 +--
.../common/util/SystemPropertyUtils.scala | 16 +++++++++
.../flink/client/trait/FlinkClientTrait.scala | 4 +--
4 files changed, 20 insertions(+), 45 deletions(-)
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/EnvUtils.java b/streampark-common/src/main/scala/org/apache/streampark/common/util/EnvUtils.java
deleted file mode 100644
index a8fd54be3..000000000
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/EnvUtils.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.common.util;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.util.Map;
-
-
-public class EnvUtils {
- public static void setEnv(String name, String value) throws Exception {
- getModifiableEnvironment().put(name, value);
- }
-
- @SuppressWarnings("unchecked")
- private static Map<String, String> getModifiableEnvironment() throws Exception {
- Class<?> pe = Class.forName("java.lang.ProcessEnvironment");
- Method getenv = pe.getDeclaredMethod("getenv");
- getenv.setAccessible(true);
- Object unmodifiableEnvironment = getenv.invoke(null);
- Class<?> map = Class.forName("java.util.Collections$UnmodifiableMap");
- Field m = map.getDeclaredField("m");
- m.setAccessible(true);
- return (Map<String, String>) m.get(unmodifiableEnvironment);
- }
-}
-
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala
index 7fa179177..396a8467e 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala
@@ -32,10 +32,10 @@ import scala.util.{Failure, Success, Try}
/** Hadoop client configuration tools mainly for flink use. */
object HadoopConfigUtils {
- val HADOOP_CLIENT_CONF_FILES: Array[String] =
+ private[this] val HADOOP_CLIENT_CONF_FILES: Array[String] =
Array("core-site.xml", "hdfs-site.xml", "yarn-site.xml")
- val HIVE_CLIENT_CONF_FILES: Array[String] =
+ private[this] val HIVE_CLIENT_CONF_FILES: Array[String] =
Array("core-site.xml", "hdfs-site.xml", "hive-site.xml")
/** Get Hadoop configuration directory path from system. */
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala
index 11dad97ff..864f3aa44 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala
@@ -18,6 +18,7 @@ package org.apache.streampark.common.util
import java.io.File
import java.security.{AccessController, PrivilegedAction}
+import java.util
import scala.util.{Failure, Success, Try}
@@ -96,6 +97,21 @@ object SystemPropertyUtils extends Logger {
def set(key: String, value: String): String =
System.getProperties.setProperty(key, value).asInstanceOf[String]
+ @throws[Exception]
+ def setEnv(name: String, value: String): Unit = {
+ val envClass = Class.forName("java.lang.ProcessEnvironment")
+ val getEnv = envClass.getDeclaredMethod("getenv")
+ getEnv.setAccessible(true)
+ val unmodifiableEnvironment = getEnv.invoke(null)
+ val clazz = Class.forName("java.util.Collections$UnmodifiableMap")
+ val field = clazz.getDeclaredField("m")
+ field.setAccessible(true)
+ field
+ .get(unmodifiableEnvironment)
+ .asInstanceOf[util.Map[String, String]]
+ .put(name, value)
+ }
+
def getOrElseUpdate(key: String, default: String): String = {
get(key) match {
case null =>
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index e6b16e377..de7907b89 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -21,7 +21,7 @@ import org.apache.streampark.common.conf.{ConfigConst, Workspace}
import org.apache.streampark.common.conf.ConfigConst._
import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode, ExecutionMode, RestoreMode}
import org.apache.streampark.common.fs.FsOperator
-import org.apache.streampark.common.util.{DeflaterUtils, EnvUtils, Logger}
+import org.apache.streampark.common.util.{DeflaterUtils, Logger, SystemPropertyUtils}
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.core.FlinkClusterClient
import org.apache.streampark.flink.core.conf.FlinkRunOption
@@ -97,7 +97,7 @@ trait FlinkClientTrait extends Logger {
if (StringUtils.isBlank(flinkOptPath)) {
logWarn(s"Get environment variable ${ConfigConstants.ENV_FLINK_OPT_DIR} fail")
val flinkHome = submitRequest.flinkVersion.flinkHome
- EnvUtils.setEnv(ConfigConstants.ENV_FLINK_OPT_DIR, s"$flinkHome/opt");
+ SystemPropertyUtils.setEnv(ConfigConstants.ENV_FLINK_OPT_DIR, s"$flinkHome/opt")
logInfo(
s"Set temporary environment variables ${ConfigConstants.ENV_FLINK_OPT_DIR} = $flinkHome/opt")
}