You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/01/07 07:24:45 UTC

[incubator-linkis] branch dev-1.0.3 updated: Use current user to read result file, and abstract the file permission in filesystem (#1232)

This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch dev-1.0.3
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git


The following commit(s) were added to refs/heads/dev-1.0.3 by this push:
     new 0a7ff30  Use current user to read result file, and abstract the file permission in filesystem (#1232)
0a7ff30 is described below

commit 0a7ff30f3b953d1fe34cfd0a540d27dd0fc8053f
Author: Leomax_Sun <28...@qq.com>
AuthorDate: Fri Jan 7 15:23:51 2022 +0800

    Use current user to read result file, and abstract the file permission in filesystem (#1232)
    
    Co-authored-by: SunShun <su...@sea.com>
---
 .../org/apache/linkis/storage/fs/FileSystem.java   | 10 +++++++++-
 .../linkis/storage/fs/impl/HDFSFileSystem.java     | 21 ++++++++++++++++-----
 .../linkis/storage/fs/impl/LocalFileSystem.java    |  6 +++---
 .../resultset/DefaultResultSetFactory.scala        | 17 +++++++++++++----
 .../storage/resultset/ResultSetFactory.scala       |  3 ++-
 .../apache/linkis/storage/source/FileSource.scala  | 22 +++++++++++++++++-----
 6 files changed, 60 insertions(+), 19 deletions(-)

diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/FileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/FileSystem.java
index ad4e7ff..9dc4a31 100644
--- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/FileSystem.java
+++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/FileSystem.java
@@ -27,8 +27,16 @@ import java.io.IOException;
 
 public abstract class FileSystem implements Fs{
 
-    protected  String user;
+    protected String user;
+    private String defaultFilePerm = "rwxr-----"; //740
+    private String defaultFolderPerm = "rwxr-x---"; //750
 
+    public String getDefaultFilePerm() {
+        return defaultFilePerm;
+    }
+    public String getDefaultFolderPerm() {
+        return defaultFolderPerm;
+    }
 
     public abstract String listRoot() throws IOException;
 
diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/HDFSFileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/HDFSFileSystem.java
index c37756f..8a6333b 100644
--- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/HDFSFileSystem.java
+++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/HDFSFileSystem.java
@@ -27,6 +27,7 @@ import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
@@ -111,7 +112,9 @@ public class HDFSFileSystem extends FileSystem {
         if (!canExecute(getParentPath(path))) {
             throw new IOException("You have not permission to access path " + path);
         }
-        return fs.mkdirs(new Path(path));
+        boolean result = fs.mkdirs(new Path(path));
+        this.setPermission(new FsPath(path), this.getDefaultFolderPerm());
+        return result;
     }
 
     @Override
@@ -124,7 +127,9 @@ public class HDFSFileSystem extends FileSystem {
         if (!canExecute(parentPath)) {
             throw new IOException("You have not permission to access path " + path);
         }
-        return fs.mkdirs(new Path(path));
+        boolean result = fs.mkdirs(new Path(path));
+        this.setPermission(new FsPath(path), this.getDefaultFolderPerm());
+        return result;
     }
 
 
@@ -237,7 +242,9 @@ public class HDFSFileSystem extends FileSystem {
         if (!overwrite) {
             return fs.append(new Path(path));
         } else {
-            return fs.create(new Path(path), true);
+            FSDataOutputStream out = fs.create(new Path(path), true);
+            this.setPermission(dest, this.getDefaultFilePerm());
+            return out;
         }
     }
 
@@ -246,7 +253,9 @@ public class HDFSFileSystem extends FileSystem {
         if (!canExecute(getParentPath(dest))) {
             throw new IOException("You have not permission to access path " + dest);
         }
-        return fs.createNewFile(new Path(checkHDFSPath(dest)));
+        boolean result = fs.createNewFile(new Path(checkHDFSPath(dest)));
+        this.setPermission(new FsPath(dest), this.getDefaultFilePerm());
+        return result;
     }
 
     @Override
@@ -254,7 +263,9 @@ public class HDFSFileSystem extends FileSystem {
         if (!canExecute(getParentPath(dest))) {
             throw new IOException("You have not permission to access path " + dest);
         }
-        return FileUtil.copy(fs, new Path(checkHDFSPath(origin)), fs, new Path(checkHDFSPath(dest)), false, true, fs.getConf());
+        boolean res = FileUtil.copy(fs, new Path(checkHDFSPath(origin)), fs, new Path(checkHDFSPath(dest)), false, true, fs.getConf());
+        this.setPermission(new FsPath(dest), this.getDefaultFilePerm());
+        return res;
     }
 
     @Override
diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/LocalFileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/LocalFileSystem.java
index dba72cc..c21c615 100644
--- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/LocalFileSystem.java
+++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/LocalFileSystem.java
@@ -150,7 +150,7 @@ public class LocalFileSystem extends FileSystem {
                 if(!user.equals(getOwner(dirToMake.getAbsolutePath()))) {
                     setOwner(new FsPath(dirToMake.getAbsolutePath()), user, null);
                 }
-                setPermission(new FsPath(dirToMake.getAbsolutePath()), "rwxr-x---");
+                setPermission(new FsPath(dirToMake.getAbsolutePath()), this.getDefaultFolderPerm());
             } else {
                 return false;
             }
@@ -179,7 +179,7 @@ public class LocalFileSystem extends FileSystem {
         }
         FileUtils.copyFile(new File(origin), file);
         try {
-            setPermission(new FsPath(dest), "rwxr-----");
+            setPermission(new FsPath(dest), this.getDefaultFilePerm());
             if(!user.equals(getOwner(dest))) {
                 setOwner(new FsPath(dest), user, null);
             }
@@ -335,7 +335,7 @@ public class LocalFileSystem extends FileSystem {
         }
         file.createNewFile();
         try {
-            setPermission(new FsPath(dest), "rwxr-----");
+            setPermission(new FsPath(dest), this.getDefaultFilePerm());
             if(!user.equals(getOwner(dest))) {
                 setOwner(new FsPath(dest), user, null);
             }
diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/DefaultResultSetFactory.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/DefaultResultSetFactory.scala
index aeab2c3..e6f5739 100644
--- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/DefaultResultSetFactory.scala
+++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/DefaultResultSetFactory.scala
@@ -5,22 +5,22 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- 
+
 package org.apache.linkis.storage.resultset
 
 import java.util
 
 import org.apache.linkis.common.io.resultset.ResultSet
-import org.apache.linkis.common.io.{FsPath, MetaData, Record}
+import org.apache.linkis.common.io.{Fs, FsPath, MetaData, Record}
 import org.apache.linkis.common.utils.{Logging, Utils}
 import org.apache.linkis.storage.FSFactory
 import org.apache.linkis.storage.domain.Dolphin
@@ -65,6 +65,15 @@ class DefaultResultSetFactory extends ResultSetFactory with Logging{
 
   override def getResultSetType:Array[String] = resultTypes
 
+  override def getResultSetByPath(fsPath: FsPath, fs: Fs): ResultSet[_ <: MetaData, _ <: Record] = {
+    val inputStream = fs.read(fsPath)
+    val resultSetType = Dolphin.getType(inputStream)
+    if (StringUtils.isEmpty(resultSetType)) throw new StorageWarnException(51000, s"The file (${fsPath.getPath}) is empty(文件(${fsPath.getPath}) 为空)")
+    Utils.tryQuietly(inputStream.close())
+    //Utils.tryQuietly(fs.close())
+    getResultSetByType(resultSetType)
+  }
+
   override def getResultSetByPath(fsPath: FsPath, proxyUser: String): ResultSet[_ <: MetaData, _ <: Record] = {
     if(fsPath == null ) return null
     info("Get Result Set By Path:" + fsPath.getPath)
diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/ResultSetFactory.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/ResultSetFactory.scala
index 724b011..6052111 100644
--- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/ResultSetFactory.scala
+++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/ResultSetFactory.scala
@@ -18,7 +18,7 @@
 package org.apache.linkis.storage.resultset
 
 import org.apache.linkis.common.io.resultset.ResultSet
-import org.apache.linkis.common.io.{FsPath, MetaData, Record}
+import org.apache.linkis.common.io.{Fs, FsPath, MetaData, Record}
 
 import scala.collection.mutable
 
@@ -27,6 +27,7 @@ import scala.collection.mutable
 trait ResultSetFactory extends scala.AnyRef {
   def getResultSetByType(resultSetType : scala.Predef.String) :ResultSet[_ <: MetaData, _ <: Record]
   def getResultSetByPath(fsPath : FsPath) : ResultSet[_ <: MetaData, _ <: Record]
+  def getResultSetByPath(fsPath: FsPath, fs: Fs) : ResultSet[_ <: MetaData, _ <: Record]
   def getResultSetByContent(content : scala.Predef.String) : ResultSet[_ <: MetaData, _ <: Record]
   def exists(resultSetType : scala.Predef.String) : scala.Boolean
   def isResultSetPath(path : scala.Predef.String) : scala.Boolean
diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/source/FileSource.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/source/FileSource.scala
index af6cb88..f13a0f5 100644
--- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/source/FileSource.scala
+++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/source/FileSource.scala
@@ -85,7 +85,12 @@ object FileSource {
   }
 
   def create(fsPath: FsPath, fs: Fs): FileSource = {
-    create(fsPath, fs.read(fsPath))
+    if (!canRead(fsPath.getPath)) throw new StorageErrorException(54001, "Unsupported open file type(不支持打开的文件类型)")
+    if (isResultSet(fsPath)) {
+      new ResultsetFileSource(Array(createResultSetFileSplit(fsPath, fs)))
+    } else {
+      new TextFileSource(Array(createTextFileSplit(fsPath, fs)))
+    }
   }
 
   def create(fsPath: FsPath, is: InputStream): FileSource = {
@@ -97,21 +102,28 @@ object FileSource {
     }
   }
 
-  private def createResultSetFileSplit(fsPath: FsPath, fs: Fs): FileSplit = {
-    createResultSetFileSplit(fsPath, fs.read(fsPath))
-  }
-
   private def createResultSetFileSplit(fsPath: FsPath, is: InputStream): FileSplit = {
     val resultset = ResultSetFactory.getInstance.getResultSetByPath(fsPath)
     val resultsetReader = ResultSetReader.getResultSetReader(resultset, is)
     new FileSplit(resultsetReader, resultset.resultSetType())
   }
 
+  private def createResultSetFileSplit(fsPath: FsPath, fs: Fs): FileSplit = {
+    val resultset = ResultSetFactory.getInstance.getResultSetByPath(fsPath, fs)
+    val resultsetReader = ResultSetReader.getResultSetReader(resultset, fs.read(fsPath))
+    new FileSplit(resultsetReader, resultset.resultSetType())
+  }
+
   private def createTextFileSplit(fsPath: FsPath, is: InputStream): FileSplit = {
     val scriptFsReader = ScriptFsReader.getScriptFsReader(fsPath, StorageConfiguration.STORAGE_RS_FILE_TYPE.getValue, is)
     new FileSplit(scriptFsReader)
   }
 
+  private def createTextFileSplit(fsPath: FsPath, fs: Fs): FileSplit = {
+    val scriptFsReader = ScriptFsReader.getScriptFsReader(fsPath, StorageConfiguration.STORAGE_RS_FILE_TYPE.getValue, fs.read(fsPath))
+    new FileSplit(scriptFsReader)
+  }
+
   private def canRead(path: String): Boolean = {
     fileType.exists(suffixPredicate(path, _))
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org