You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2023/09/10 13:36:22 UTC

[incubator-streampark] 01/01: [Improve] EnvUtils minor improvement

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch env
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit 25475f344d9df485bd3c63355ac86b16946e4519
Author: benjobs <be...@apache.org>
AuthorDate: Sun Sep 10 21:36:05 2023 +0800

    [Improve] EnvUtils minor improvement
---
 .../apache/streampark/common/util/EnvUtils.java    | 41 ----------------------
 .../streampark/common/util/HadoopConfigUtils.scala |  4 +--
 .../common/util/SystemPropertyUtils.scala          | 16 +++++++++
 .../flink/client/trait/FlinkClientTrait.scala      |  4 +--
 4 files changed, 20 insertions(+), 45 deletions(-)

diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/EnvUtils.java b/streampark-common/src/main/scala/org/apache/streampark/common/util/EnvUtils.java
deleted file mode 100644
index a8fd54be3..000000000
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/EnvUtils.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.common.util;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.util.Map;
-
-
-public class EnvUtils {
-    public static void setEnv(String name, String value) throws Exception {
-        getModifiableEnvironment().put(name, value);
-    }
-
-    @SuppressWarnings("unchecked")
-    private static Map<String, String> getModifiableEnvironment() throws Exception {
-        Class<?> pe = Class.forName("java.lang.ProcessEnvironment");
-        Method getenv = pe.getDeclaredMethod("getenv");
-        getenv.setAccessible(true);
-        Object unmodifiableEnvironment = getenv.invoke(null);
-        Class<?> map = Class.forName("java.util.Collections$UnmodifiableMap");
-        Field m = map.getDeclaredField("m");
-        m.setAccessible(true);
-        return (Map<String, String>) m.get(unmodifiableEnvironment);
-    }
-}
-
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala
index 7fa179177..396a8467e 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala
@@ -32,10 +32,10 @@ import scala.util.{Failure, Success, Try}
 /** Hadoop client configuration tools mainly for flink use. */
 object HadoopConfigUtils {
 
-  val HADOOP_CLIENT_CONF_FILES: Array[String] =
+  private[this] val HADOOP_CLIENT_CONF_FILES: Array[String] =
     Array("core-site.xml", "hdfs-site.xml", "yarn-site.xml")
 
-  val HIVE_CLIENT_CONF_FILES: Array[String] =
+  private[this] val HIVE_CLIENT_CONF_FILES: Array[String] =
     Array("core-site.xml", "hdfs-site.xml", "hive-site.xml")
 
   /** Get Hadoop configuration directory path from system. */
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala
index 11dad97ff..864f3aa44 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala
@@ -18,6 +18,7 @@ package org.apache.streampark.common.util
 
 import java.io.File
 import java.security.{AccessController, PrivilegedAction}
+import java.util
 
 import scala.util.{Failure, Success, Try}
 
@@ -96,6 +97,21 @@ object SystemPropertyUtils extends Logger {
   def set(key: String, value: String): String =
     System.getProperties.setProperty(key, value).asInstanceOf[String]
 
+  @throws[Exception]
+  def setEnv(name: String, value: String): Unit = {
+    val envClass = Class.forName("java.lang.ProcessEnvironment")
+    val getEnv = envClass.getDeclaredMethod("getenv")
+    getEnv.setAccessible(true)
+    val unmodifiableEnvironment = getEnv.invoke(null)
+    val clazz = Class.forName("java.util.Collections$UnmodifiableMap")
+    val field = clazz.getDeclaredField("m")
+    field.setAccessible(true)
+    field
+      .get(unmodifiableEnvironment)
+      .asInstanceOf[util.Map[String, String]]
+      .put(name, value)
+  }
+
   def getOrElseUpdate(key: String, default: String): String = {
     get(key) match {
       case null =>
diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index e6b16e377..de7907b89 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -21,7 +21,7 @@ import org.apache.streampark.common.conf.{ConfigConst, Workspace}
 import org.apache.streampark.common.conf.ConfigConst._
 import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode, ExecutionMode, RestoreMode}
 import org.apache.streampark.common.fs.FsOperator
-import org.apache.streampark.common.util.{DeflaterUtils, EnvUtils, Logger}
+import org.apache.streampark.common.util.{DeflaterUtils, Logger, SystemPropertyUtils}
 import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.core.FlinkClusterClient
 import org.apache.streampark.flink.core.conf.FlinkRunOption
@@ -97,7 +97,7 @@ trait FlinkClientTrait extends Logger {
         if (StringUtils.isBlank(flinkOptPath)) {
           logWarn(s"Get environment variable ${ConfigConstants.ENV_FLINK_OPT_DIR} fail")
           val flinkHome = submitRequest.flinkVersion.flinkHome
-          EnvUtils.setEnv(ConfigConstants.ENV_FLINK_OPT_DIR, s"$flinkHome/opt");
+          SystemPropertyUtils.setEnv(ConfigConstants.ENV_FLINK_OPT_DIR, s"$flinkHome/opt")
           logInfo(
             s"Set temporary environment variables ${ConfigConstants.ENV_FLINK_OPT_DIR} = $flinkHome/opt")
         }