You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@linkis.apache.org by pe...@apache.org on 2022/01/07 07:24:45 UTC
[incubator-linkis] branch dev-1.0.3 updated: Use current user to read result file, and abstract the file permission in filesystem (#1232)
This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch dev-1.0.3
in repository https://gitbox.apache.org/repos/asf/incubator-linkis.git
The following commit(s) were added to refs/heads/dev-1.0.3 by this push:
new 0a7ff30 Use current user to read result file, and abstract the file permission in filesystem (#1232)
0a7ff30 is described below
commit 0a7ff30f3b953d1fe34cfd0a540d27dd0fc8053f
Author: Leomax_Sun <28...@qq.com>
AuthorDate: Fri Jan 7 15:23:51 2022 +0800
Use current user to read result file, and abstract the file permission in filesystem (#1232)
Co-authored-by: SunShun <su...@sea.com>
---
.../org/apache/linkis/storage/fs/FileSystem.java | 10 +++++++++-
.../linkis/storage/fs/impl/HDFSFileSystem.java | 21 ++++++++++++++++-----
.../linkis/storage/fs/impl/LocalFileSystem.java | 6 +++---
.../resultset/DefaultResultSetFactory.scala | 17 +++++++++++++----
.../storage/resultset/ResultSetFactory.scala | 3 ++-
.../apache/linkis/storage/source/FileSource.scala | 22 +++++++++++++++++-----
6 files changed, 60 insertions(+), 19 deletions(-)
diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/FileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/FileSystem.java
index ad4e7ff..9dc4a31 100644
--- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/FileSystem.java
+++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/FileSystem.java
@@ -27,8 +27,16 @@ import java.io.IOException;
public abstract class FileSystem implements Fs{
- protected String user;
+ protected String user;
+ private String defaultFilePerm = "rwxr-----"; //740
+ private String defaultFolderPerm = "rwxr-x---"; //750
+ public String getDefaultFilePerm() {
+ return defaultFilePerm;
+ }
+ public String getDefaultFolderPerm() {
+ return defaultFolderPerm;
+ }
public abstract String listRoot() throws IOException;
diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/HDFSFileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/HDFSFileSystem.java
index c37756f..8a6333b 100644
--- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/HDFSFileSystem.java
+++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/HDFSFileSystem.java
@@ -27,6 +27,7 @@ import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@@ -111,7 +112,9 @@ public class HDFSFileSystem extends FileSystem {
if (!canExecute(getParentPath(path))) {
throw new IOException("You have not permission to access path " + path);
}
- return fs.mkdirs(new Path(path));
+ boolean result = fs.mkdirs(new Path(path));
+ this.setPermission(new FsPath(path), this.getDefaultFolderPerm());
+ return result;
}
@Override
@@ -124,7 +127,9 @@ public class HDFSFileSystem extends FileSystem {
if (!canExecute(parentPath)) {
throw new IOException("You have not permission to access path " + path);
}
- return fs.mkdirs(new Path(path));
+ boolean result = fs.mkdirs(new Path(path));
+ this.setPermission(new FsPath(path), this.getDefaultFolderPerm());
+ return result;
}
@@ -237,7 +242,9 @@ public class HDFSFileSystem extends FileSystem {
if (!overwrite) {
return fs.append(new Path(path));
} else {
- return fs.create(new Path(path), true);
+ FSDataOutputStream out = fs.create(new Path(path), true);
+ this.setPermission(dest, this.getDefaultFilePerm());
+ return out;
}
}
@@ -246,7 +253,9 @@ public class HDFSFileSystem extends FileSystem {
if (!canExecute(getParentPath(dest))) {
throw new IOException("You have not permission to access path " + dest);
}
- return fs.createNewFile(new Path(checkHDFSPath(dest)));
+ boolean result = fs.createNewFile(new Path(checkHDFSPath(dest)));
+ this.setPermission(new FsPath(dest), this.getDefaultFilePerm());
+ return result;
}
@Override
@@ -254,7 +263,9 @@ public class HDFSFileSystem extends FileSystem {
if (!canExecute(getParentPath(dest))) {
throw new IOException("You have not permission to access path " + dest);
}
- return FileUtil.copy(fs, new Path(checkHDFSPath(origin)), fs, new Path(checkHDFSPath(dest)), false, true, fs.getConf());
+ boolean res = FileUtil.copy(fs, new Path(checkHDFSPath(origin)), fs, new Path(checkHDFSPath(dest)), false, true, fs.getConf());
+ this.setPermission(new FsPath(dest), this.getDefaultFilePerm());
+ return res;
}
@Override
diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/LocalFileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/LocalFileSystem.java
index dba72cc..c21c615 100644
--- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/LocalFileSystem.java
+++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/LocalFileSystem.java
@@ -150,7 +150,7 @@ public class LocalFileSystem extends FileSystem {
if(!user.equals(getOwner(dirToMake.getAbsolutePath()))) {
setOwner(new FsPath(dirToMake.getAbsolutePath()), user, null);
}
- setPermission(new FsPath(dirToMake.getAbsolutePath()), "rwxr-x---");
+ setPermission(new FsPath(dirToMake.getAbsolutePath()), this.getDefaultFolderPerm());
} else {
return false;
}
@@ -179,7 +179,7 @@ public class LocalFileSystem extends FileSystem {
}
FileUtils.copyFile(new File(origin), file);
try {
- setPermission(new FsPath(dest), "rwxr-----");
+ setPermission(new FsPath(dest), this.getDefaultFilePerm());
if(!user.equals(getOwner(dest))) {
setOwner(new FsPath(dest), user, null);
}
@@ -335,7 +335,7 @@ public class LocalFileSystem extends FileSystem {
}
file.createNewFile();
try {
- setPermission(new FsPath(dest), "rwxr-----");
+ setPermission(new FsPath(dest), this.getDefaultFilePerm());
if(!user.equals(getOwner(dest))) {
setOwner(new FsPath(dest), user, null);
}
diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/DefaultResultSetFactory.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/DefaultResultSetFactory.scala
index aeab2c3..e6f5739 100644
--- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/DefaultResultSetFactory.scala
+++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/DefaultResultSetFactory.scala
@@ -5,22 +5,22 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+
package org.apache.linkis.storage.resultset
import java.util
import org.apache.linkis.common.io.resultset.ResultSet
-import org.apache.linkis.common.io.{FsPath, MetaData, Record}
+import org.apache.linkis.common.io.{Fs, FsPath, MetaData, Record}
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.storage.FSFactory
import org.apache.linkis.storage.domain.Dolphin
@@ -65,6 +65,15 @@ class DefaultResultSetFactory extends ResultSetFactory with Logging{
override def getResultSetType:Array[String] = resultTypes
+ override def getResultSetByPath(fsPath: FsPath, fs: Fs): ResultSet[_ <: MetaData, _ <: Record] = {
+ val inputStream = fs.read(fsPath)
+ val resultSetType = Dolphin.getType(inputStream)
+ if (StringUtils.isEmpty(resultSetType)) throw new StorageWarnException(51000, s"The file (${fsPath.getPath}) is empty(文件(${fsPath.getPath}) 为空)")
+ Utils.tryQuietly(inputStream.close())
+ //Utils.tryQuietly(fs.close())
+ getResultSetByType(resultSetType)
+ }
+
override def getResultSetByPath(fsPath: FsPath, proxyUser: String): ResultSet[_ <: MetaData, _ <: Record] = {
if(fsPath == null ) return null
info("Get Result Set By Path:" + fsPath.getPath)
diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/ResultSetFactory.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/ResultSetFactory.scala
index 724b011..6052111 100644
--- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/ResultSetFactory.scala
+++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/resultset/ResultSetFactory.scala
@@ -18,7 +18,7 @@
package org.apache.linkis.storage.resultset
import org.apache.linkis.common.io.resultset.ResultSet
-import org.apache.linkis.common.io.{FsPath, MetaData, Record}
+import org.apache.linkis.common.io.{Fs, FsPath, MetaData, Record}
import scala.collection.mutable
@@ -27,6 +27,7 @@ import scala.collection.mutable
trait ResultSetFactory extends scala.AnyRef {
def getResultSetByType(resultSetType : scala.Predef.String) :ResultSet[_ <: MetaData, _ <: Record]
def getResultSetByPath(fsPath : FsPath) : ResultSet[_ <: MetaData, _ <: Record]
+ def getResultSetByPath(fsPath: FsPath, fs: Fs) : ResultSet[_ <: MetaData, _ <: Record]
def getResultSetByContent(content : scala.Predef.String) : ResultSet[_ <: MetaData, _ <: Record]
def exists(resultSetType : scala.Predef.String) : scala.Boolean
def isResultSetPath(path : scala.Predef.String) : scala.Boolean
diff --git a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/source/FileSource.scala b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/source/FileSource.scala
index af6cb88..f13a0f5 100644
--- a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/source/FileSource.scala
+++ b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/source/FileSource.scala
@@ -85,7 +85,12 @@ object FileSource {
}
def create(fsPath: FsPath, fs: Fs): FileSource = {
- create(fsPath, fs.read(fsPath))
+ if (!canRead(fsPath.getPath)) throw new StorageErrorException(54001, "Unsupported open file type(不支持打开的文件类型)")
+ if (isResultSet(fsPath)) {
+ new ResultsetFileSource(Array(createResultSetFileSplit(fsPath, fs)))
+ } else {
+ new TextFileSource(Array(createTextFileSplit(fsPath, fs)))
+ }
}
def create(fsPath: FsPath, is: InputStream): FileSource = {
@@ -97,21 +102,28 @@ object FileSource {
}
}
- private def createResultSetFileSplit(fsPath: FsPath, fs: Fs): FileSplit = {
- createResultSetFileSplit(fsPath, fs.read(fsPath))
- }
-
private def createResultSetFileSplit(fsPath: FsPath, is: InputStream): FileSplit = {
val resultset = ResultSetFactory.getInstance.getResultSetByPath(fsPath)
val resultsetReader = ResultSetReader.getResultSetReader(resultset, is)
new FileSplit(resultsetReader, resultset.resultSetType())
}
+ private def createResultSetFileSplit(fsPath: FsPath, fs: Fs): FileSplit = {
+ val resultset = ResultSetFactory.getInstance.getResultSetByPath(fsPath, fs)
+ val resultsetReader = ResultSetReader.getResultSetReader(resultset, fs.read(fsPath))
+ new FileSplit(resultsetReader, resultset.resultSetType())
+ }
+
private def createTextFileSplit(fsPath: FsPath, is: InputStream): FileSplit = {
val scriptFsReader = ScriptFsReader.getScriptFsReader(fsPath, StorageConfiguration.STORAGE_RS_FILE_TYPE.getValue, is)
new FileSplit(scriptFsReader)
}
+ private def createTextFileSplit(fsPath: FsPath, fs: Fs): FileSplit = {
+ val scriptFsReader = ScriptFsReader.getScriptFsReader(fsPath, StorageConfiguration.STORAGE_RS_FILE_TYPE.getValue, fs.read(fsPath))
+ new FileSplit(scriptFsReader)
+ }
+
private def canRead(path: String): Boolean = {
fileType.exists(suffixPredicate(path, _))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@linkis.apache.org
For additional commands, e-mail: commits-help@linkis.apache.org