You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/07/18 14:14:52 UTC

[incubator-doris] branch master updated: [HttpV2] Add more httpv2 APIs (#6210)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a4b1622  [HttpV2] Add more httpv2 APIs (#6210)
a4b1622 is described below

commit a4b1622cebb10e9fbce0c9e5b4c4598f0bc08e50
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Sun Jul 18 22:14:42 2021 +0800

    [HttpV2] Add more httpv2 APIs (#6210)
    
    1. /api/cluster_overview to view some statistic info of the cluster
    2. /api/meta/ to view the database/table schema
    3. /api/import/file_review to review the file content with format CSV or PARQUET.
---
 .../src/main/java/org/apache/doris/PaloFe.java     |   1 +
 .../doris/common/parquet/BrokerInputFile.java      | 272 ++++++++++++++
 .../doris/common/parquet/LocalInputFile.java       | 197 +++++++++++
 .../apache/doris/common/parquet/ParquetReader.java | 136 +++++++
 .../org/apache/doris/common/util/BrokerReader.java | 166 +++++++++
 .../org/apache/doris/common/util/BrokerUtil.java   |  10 +-
 .../apache/doris/httpv2/restv2/ImportAction.java   | 228 ++++++++++++
 .../doris/httpv2/restv2/MetaInfoActionV2.java      | 392 +++++++++++++++++++++
 .../doris/httpv2/restv2/StatisticAction.java       |  89 +++++
 .../doris/load/loadv2/SparkEtlJobHandler.java      |  15 +-
 .../doris/common/parquet/ParquetReaderTest.java    |  61 ++++
 .../apache/doris/common/util/BrokerUtilTest.java   |  19 +-
 .../doris/load/loadv2/SparkEtlJobHandlerTest.java  |   7 +-
 fe/pom.xml                                         |   8 +
 14 files changed, 1579 insertions(+), 22 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java b/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java
index ab45c7a..9d4cc9d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java
@@ -313,3 +313,4 @@ public class PaloFe {
     }
 }
 
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/parquet/BrokerInputFile.java b/fe/fe-core/src/main/java/org/apache/doris/common/parquet/BrokerInputFile.java
new file mode 100644
index 0000000..f2aa3d6
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/parquet/BrokerInputFile.java
@@ -0,0 +1,272 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.parquet;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.common.util.BrokerReader;
+import org.apache.doris.thrift.TBrokerFD;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.SeekableInputStream;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class BrokerInputFile implements InputFile {
+    private static final Logger LOG = LogManager.getLogger(BrokerInputFile.class);
+
+    private static final int COPY_BUFFER_SIZE = 1024 * 1024;
+
+    private BrokerDesc brokerDesc;
+    private String filePath;
+    private long fileLength = 0;
+    private BrokerReader reader;
+    private TBrokerFD fd;
+
+    public static BrokerInputFile create(String filePath, BrokerDesc brokerDesc) throws IOException {
+        BrokerInputFile inputFile = new BrokerInputFile(filePath, brokerDesc);
+        inputFile.init();
+        return inputFile;
+    }
+
+    // For test only. ip port is broker ip port
+    public static BrokerInputFile create(String filePath, BrokerDesc brokerDesc, String ip, int port) throws IOException {
+        BrokerInputFile inputFile = new BrokerInputFile(filePath, brokerDesc);
+        inputFile.init(ip, port);
+        return inputFile;
+    }
+
+    private BrokerInputFile(String filePath, BrokerDesc brokerDesc) {
+        this.filePath = filePath;
+        this.brokerDesc = brokerDesc;
+    }
+
+    private void init() throws IOException {
+        this.reader = BrokerReader.create(this.brokerDesc);
+        this.fileLength = this.reader.getFileLength(filePath);
+        this.fd = this.reader.open(filePath);
+    }
+
+    // For test only. ip port is broker ip port
+    private void init(String ip, int port) throws IOException {
+        this.reader = BrokerReader.create(this.brokerDesc, ip, port);
+        this.fileLength = this.reader.getFileLength(filePath);
+        this.fd = this.reader.open(filePath);
+    }
+
+    @Override
+    public long getLength() throws IOException {
+        return fileLength;
+    }
+
+    @Override
+    public SeekableInputStream newStream() throws IOException {
+        return new SeekableInputStream() {
+            private final byte[] tmpBuf = new byte[COPY_BUFFER_SIZE];
+            private long currentPos = 0;
+            private long markPos = 0;
+
+            private long bufferOffset = 0;
+            private long bufferLimit = 0;
+
+            @Override
+            public int read() throws IOException {
+                try {
+                    if (currentPos < bufferOffset || currentPos > bufferLimit || bufferOffset >= bufferLimit) {
+                        bufferOffset = currentPos;
+                        fill();
+                    }
+                    if (currentPos > bufferLimit) {
+                        LOG.warn("current pos {} is larger than buffer limit {}. should not happen.", currentPos, bufferLimit);
+                        return -1;
+                    }
+
+                    int pos = (int) (currentPos - bufferOffset);
+                    int res = Byte.toUnsignedInt(tmpBuf[pos]);
+                    ++currentPos;
+                    return res;
+                } catch (BrokerReader.EOFException e) {
+                    return -1;
+                }
+            }
+
+            private void fill() throws IOException, BrokerReader.EOFException {
+                byte[] data = reader.pread(fd, bufferOffset, COPY_BUFFER_SIZE);
+                System.arraycopy(data, 0, tmpBuf, 0, data.length);
+                bufferLimit = bufferOffset + data.length;
+            }
+
+            @SuppressWarnings("NullableProblems")
+            @Override
+            public int read(byte[] b) throws IOException {
+                try {
+                    byte[] data = reader.pread(fd, currentPos, b.length);
+                    System.arraycopy(data, 0, b, 0, data.length);
+                    currentPos += data.length;
+                    return data.length;
+                } catch (BrokerReader.EOFException e) {
+                    return -1;
+                }
+            }
+
+            @SuppressWarnings("NullableProblems")
+            @Override
+            public int read(byte[] b, int off, int len) throws IOException {
+                try {
+                    if (currentPos < bufferOffset || currentPos > bufferLimit || currentPos + len > bufferLimit) {
+                        if (len > COPY_BUFFER_SIZE) {
+                            // the data to be read is larger then max size of buffer.
+                            // read it directly.
+                            byte[] data = reader.pread(fd, currentPos, len);
+                            System.arraycopy(data, 0, b, off, data.length);
+                            currentPos += data.length;
+                            return data.length;
+                        }
+                        // fill the buffer first
+                        bufferOffset = currentPos;
+                        fill();
+                    }
+
+                    if (currentPos > bufferLimit) {
+                        LOG.warn("current pos {} is larger than buffer limit {}. should not happen.", currentPos, bufferLimit);
+                        return -1;
+                    }
+
+                    int start = (int) (currentPos - bufferOffset);
+                    int readLen = Math.min(len, (int) (bufferLimit - bufferOffset));
+                    System.arraycopy(tmpBuf, start, b, off, readLen);
+                    currentPos += readLen;
+                    return readLen;
+                } catch (BrokerReader.EOFException e) {
+                    return -1;
+                }
+            }
+
+            @Override
+            public long skip(long n) throws IOException {
+                final long left = fileLength - currentPos;
+                long min = Math.min(n, left);
+                currentPos += min;
+                return min;
+            }
+
+            @Override
+            public int available() throws IOException {
+                return 0;
+            }
+
+            @Override
+            public void close() throws IOException {
+                reader.close(fd);
+            }
+
+            @SuppressWarnings({"unchecked", "unused", "UnusedReturnValue"})
+            private <T extends Throwable, R> R uncheckedExceptionThrow(Throwable t) throws T {
+                throw (T) t;
+            }
+
+            @Override
+            public synchronized void mark(int readlimit) {
+                markPos = currentPos;
+            }
+
+            @Override
+            public synchronized void reset() throws IOException {
+                currentPos = markPos;
+            }
+
+            @Override
+            public boolean markSupported() {
+                return true;
+            }
+
+            @Override
+            public long getPos() throws IOException {
+                return currentPos;
+            }
+
+            @Override
+            public void seek(long l) throws IOException {
+                currentPos = l;
+            }
+
+            @Override
+            public void readFully(byte[] bytes) throws IOException {
+                try {
+                    byte[] data = reader.pread(fd, currentPos, bytes.length);
+                    System.arraycopy(data, 0, bytes, 0, data.length);
+                    currentPos += data.length;
+                    if (data.length < bytes.length) {
+                        throw new EOFException("Reach the end of file with " + (bytes.length - data.length)
+                                + " bytes left to read");
+                    }
+                } catch (BrokerReader.EOFException e) {
+                    throw new EOFException("Reach the end of file");
+                }
+            }
+
+            @Override
+            public void readFully(byte[] bytes, int offset, int len) throws IOException {
+                try {
+                    byte[] data = reader.pread(fd, currentPos, len);
+                    System.arraycopy(data, 0, bytes, offset, data.length);
+                    currentPos += data.length;
+                    if (data.length < len) {
+                        throw new EOFException("Reach the end of file with " + (len - data.length)
+                                + " bytes left to read");
+                    }
+                } catch (BrokerReader.EOFException e) {
+                    throw new EOFException("Reach the end of file");
+                }
+            }
+
+            @Override
+            public int read(ByteBuffer byteBuffer) throws IOException {
+                try {
+                    byte[] data = reader.pread(fd, currentPos, byteBuffer.remaining());
+                    byteBuffer.put(data);
+                    currentPos += data.length;
+                    return data.length;
+                } catch (BrokerReader.EOFException e) {
+                    return -1;
+                }
+            }
+
+            @Override
+            public void readFully(ByteBuffer byteBuffer) throws IOException {
+                long markCurPos = currentPos;
+                while (byteBuffer.remaining() > 0) {
+                    try {
+                        byte[] data = reader.pread(fd, currentPos, byteBuffer.remaining());
+                        byteBuffer.put(data);
+                        currentPos += data.length;
+                    } catch (BrokerReader.EOFException e) {
+                        if (byteBuffer.remaining() > 0) {
+                            throw new EOFException("Reach the end of file with " + byteBuffer.remaining() + " bytes left to read. "
+                                + "read len: " + (currentPos - markCurPos));
+                        }
+                    }
+                }
+            }
+        }; // end of new SeekableInputStream
+    } // end of newStream
+}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/parquet/LocalInputFile.java b/fe/fe-core/src/main/java/org/apache/doris/common/parquet/LocalInputFile.java
new file mode 100644
index 0000000..6a2cf92
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/parquet/LocalInputFile.java
@@ -0,0 +1,197 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.parquet;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.SeekableInputStream;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+
+/**
+ * Modified version of
+ * https://github.com/tideworks/arvo2parquet/blob/master/src/main/java/com/tideworks/data_load/io/InputFile.java
+ */
+public class LocalInputFile implements InputFile {
+    private static final Logger LOG = LogManager.getLogger(LocalInputFile.class);
+
+    private static final int COPY_BUFFER_SIZE = 8192;
+    private final RandomAccessFile input;
+
+    public LocalInputFile(File file) throws FileNotFoundException {
+        this.input = new RandomAccessFile(file, "r");
+    }
+
+    private static int readDirectBuffer(ByteBuffer byteBufr, byte[] tmpBuf, ByteBufReader rdr)
+            throws IOException {
+        // copy all the bytes that return immediately, stopping at the first
+        // read that doesn't return a full buffer.
+        int nextReadLength = Math.min(byteBufr.remaining(), tmpBuf.length);
+        int totalBytesRead = 0;
+        int bytesRead;
+
+        while ((bytesRead = rdr.read(tmpBuf, 0, nextReadLength)) == tmpBuf.length) {
+            byteBufr.put(tmpBuf);
+            totalBytesRead += bytesRead;
+            nextReadLength = Math.min(byteBufr.remaining(), tmpBuf.length);
+        }
+
+        if (bytesRead < 0) {
+            // return -1 if nothing was read
+            return totalBytesRead == 0 ? -1 : totalBytesRead;
+        } else {
+            // copy the last partial buffer
+            byteBufr.put(tmpBuf, 0, bytesRead);
+            totalBytesRead += bytesRead;
+            return totalBytesRead;
+        }
+    }
+
+    private static void readFullyDirectBuffer(ByteBuffer byteBufr, byte[] tmpBuf, ByteBufReader rdr)
+            throws IOException {
+        int nextReadLength = Math.min(byteBufr.remaining(), tmpBuf.length);
+        int bytesRead = 0;
+
+        while (nextReadLength > 0 && (bytesRead = rdr.read(tmpBuf, 0, nextReadLength)) >= 0) {
+            byteBufr.put(tmpBuf, 0, bytesRead);
+            nextReadLength = Math.min(byteBufr.remaining(), tmpBuf.length);
+        }
+
+        if (bytesRead < 0 && byteBufr.remaining() > 0) {
+            throw new EOFException(
+                    "Reached the end of stream with " + byteBufr.remaining() + " bytes left to read");
+        }
+    }
+
+    @Override
+    public long getLength() throws IOException {
+        return input.length();
+    }
+
+    @Override
+    public SeekableInputStream newStream() {
+        return new SeekableInputStream() {
+            private final byte[] tmpBuf = new byte[COPY_BUFFER_SIZE];
+            private long markPos = 0;
+
+            @Override
+            public int read() throws IOException {
+                return input.read();
+            }
+
+            @SuppressWarnings("NullableProblems")
+            @Override
+            public int read(byte[] b) throws IOException {
+                return input.read(b);
+            }
+
+            @SuppressWarnings("NullableProblems")
+            @Override
+            public int read(byte[] b, int off, int len) throws IOException {
+                return input.read(b, off, len);
+            }
+
+            @Override
+            public long skip(long n) throws IOException {
+                final long savPos = input.getFilePointer();
+                final long amtLeft = input.length() - savPos;
+                n = Math.min(n, amtLeft);
+                final long newPos = savPos + n;
+                input.seek(newPos);
+                final long curPos = input.getFilePointer();
+                return curPos - savPos;
+            }
+
+            @Override
+            public int available() throws IOException {
+                return 0;
+            }
+
+            @Override
+            public void close() throws IOException {
+                input.close();
+            }
+
+            @SuppressWarnings({"unchecked", "unused", "UnusedReturnValue"})
+            private <T extends Throwable, R> R uncheckedExceptionThrow(Throwable t) throws T {
+                throw (T) t;
+            }
+
+            @Override
+            public synchronized void mark(int readlimit) {
+                try {
+                    markPos = input.getFilePointer();
+                } catch (IOException e) {
+                    uncheckedExceptionThrow(e);
+                }
+            }
+
+            @Override
+            public synchronized void reset() throws IOException {
+                input.seek(markPos);
+            }
+
+            @Override
+            public boolean markSupported() {
+                return true;
+            }
+
+            @Override
+            public long getPos() throws IOException {
+                return input.getFilePointer();
+            }
+
+            @Override
+            public void seek(long l) throws IOException {
+                input.seek(l);
+            }
+
+            @Override
+            public void readFully(byte[] bytes) throws IOException {
+                input.readFully(bytes);
+            }
+
+            @Override
+            public void readFully(byte[] bytes, int i, int i1) throws IOException {
+                input.readFully(bytes, i, i1);
+            }
+
+            @Override
+            public int read(ByteBuffer byteBuffer) throws IOException {
+                return readDirectBuffer(byteBuffer, tmpBuf, input::read);
+            }
+
+            @Override
+            public void readFully(ByteBuffer byteBuffer) throws IOException {
+                readFullyDirectBuffer(byteBuffer, tmpBuf, input::read);
+            }
+        };
+    }
+
+    private interface ByteBufReader {
+
+        int read(byte[] b, int off, int len) throws IOException;
+    }
+}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/parquet/ParquetReader.java b/fe/fe-core/src/main/java/org/apache/doris/common/parquet/ParquetReader.java
new file mode 100644
index 0000000..5850af1
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/parquet/ParquetReader.java
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.parquet;
+
+import org.apache.doris.analysis.BrokerDesc;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+public class ParquetReader {
+
+    private ParquetFileReader fileReader;
+
+    private ParquetReader(InputFile inputFile) throws IOException {
+        ParquetReadOptions readOptions = ParquetReadOptions.builder().build();
+        this.fileReader = new ParquetFileReader(inputFile, readOptions);
+    }
+
+    public static ParquetReader create(String filePath, BrokerDesc brokerDesc) throws IOException {
+        BrokerInputFile inputFile = BrokerInputFile.create(filePath, brokerDesc);
+        return new ParquetReader(inputFile);
+    }
+
+    public static ParquetReader create(String filePath) throws IOException {
+        LocalInputFile inputFile = new LocalInputFile(new File(filePath));
+        return new ParquetReader(inputFile);
+    }
+
+    // For test only. ip port is broker ip port
+    public static ParquetReader create(String filePath, BrokerDesc brokerDesc, String ip, int port) throws IOException {
+        BrokerInputFile inputFile = BrokerInputFile.create(filePath, brokerDesc, ip, port);
+        return new ParquetReader(inputFile);
+    }
+
+    // Get file schema as a list of column name
+    public List<String> getSchema(boolean debug) {
+        List<String> colNames = Lists.newArrayList();
+        FileMetaData metaData = fileReader.getFileMetaData();
+        MessageType messageType = metaData.getSchema();
+        List<ColumnDescriptor> columnDescriptors = messageType.getColumns();
+        for (ColumnDescriptor column : columnDescriptors) {
+            if (debug) {
+                colNames.add(column.toString());
+            } else {
+                String colName = column.getPath()[0];
+                if (column.getMaxDefinitionLevel() > 1) {
+                    // this is a nested column, print then definition level
+                    colName += " (" + column.getMaxDefinitionLevel() + ")";
+                }
+                colNames.add(colName);
+            }
+        }
+        return colNames;
+    }
+
+    // get limit number of file content as 2-dimension array
+    public List<List<String>> getLines(int limit) throws IOException {
+        List<List<String>> lines = Lists.newArrayList();
+        FileMetaData metaData = fileReader.getFileMetaData();
+        MessageType schema = metaData.getSchema();
+        final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
+        int readLines = 0;
+        PageReadStore pages = null;
+        while (null != (pages = fileReader.readNextRowGroup()) && readLines < limit) {
+            final long rows = pages.getRowCount();
+            final RecordReader<Group> recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema));
+            for (int i = 0; i < rows && readLines < limit; i++) {
+                List<String> line = Lists.newArrayList();
+                final Group g = recordReader.read();
+                parseGroup(g, line);
+                lines.add(line);
+                readLines++;
+            }
+        }
+        return lines;
+    }
+
+    private void parseGroup(Group g, List<String> line) {
+        int fieldCount = g.getType().getFieldCount();
+        for (int field = 0; field < fieldCount; field++) {
+            int valueCount = g.getFieldRepetitionCount(field);
+            Type fieldType = g.getType().getType(field);
+            String fieldName = fieldType.getName();
+            if (valueCount == 1) {
+                line.add(g.getValueToString(field, 0));
+            } else if (valueCount > 1) {
+                List<String> array = Lists.newArrayList();
+                for (int index = 0; index < valueCount; index++) {
+                    if (fieldType.isPrimitive()) {
+                        array.add(g.getValueToString(field, index));
+                    }
+                }
+                line.add("[" + Joiner.on(",").join(array) + "]");
+            } else {
+                line.add("");
+            }
+        }
+    }
+
+    public void close() throws IOException {
+        fileReader.close();
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerReader.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerReader.java
new file mode 100644
index 0000000..b59e298
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerReader.java
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.UserException;
+import org.apache.doris.service.FrontendOptions;
+import org.apache.doris.thrift.TBrokerCloseReaderRequest;
+import org.apache.doris.thrift.TBrokerFD;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TBrokerListPathRequest;
+import org.apache.doris.thrift.TBrokerListResponse;
+import org.apache.doris.thrift.TBrokerOpenReaderRequest;
+import org.apache.doris.thrift.TBrokerOpenReaderResponse;
+import org.apache.doris.thrift.TBrokerOperationStatus;
+import org.apache.doris.thrift.TBrokerOperationStatusCode;
+import org.apache.doris.thrift.TBrokerPReadRequest;
+import org.apache.doris.thrift.TBrokerReadResponse;
+import org.apache.doris.thrift.TBrokerVersion;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPaloBrokerService;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+
+public class BrokerReader {
+    private static final Logger LOG = LogManager.getLogger(BrokerReader.class);
+
+    private BrokerDesc brokerDesc;
+    private TNetworkAddress address;
+    private TPaloBrokerService.Client client;
+    private long currentPos;
+
+    public static BrokerReader create(BrokerDesc brokerDesc) throws IOException {
+        try {
+            TNetworkAddress address = BrokerUtil.getAddress(brokerDesc);
+            return new BrokerReader(address, BrokerUtil.borrowClient(address), brokerDesc);
+        } catch (UserException e) {
+            throw new IOException(e);
+        }
+    }
+
+    // For test only
+    public static BrokerReader create(BrokerDesc brokerDesc, String ip, int port) throws IOException {
+        try {
+            TNetworkAddress address = new TNetworkAddress(ip, port);
+            return new BrokerReader(address, BrokerUtil.borrowClient(address), brokerDesc);
+        } catch (UserException e) {
+            throw new IOException(e);
+        }
+    }
+
+    private BrokerReader(TNetworkAddress address, TPaloBrokerService.Client client, BrokerDesc brokerDesc) {
+        this.brokerDesc = brokerDesc;
+        this.address = address;
+        this.client = client;
+    }
+
+    public long getCurrentPos() {
+        return currentPos;
+    }
+
+    public byte[] pread(TBrokerFD fd, long offset, int length) throws IOException, EOFException {
+        TBrokerPReadRequest tPReadRequest = new TBrokerPReadRequest(
+                TBrokerVersion.VERSION_ONE, fd, offset, length);
+        TBrokerReadResponse tReadResponse = null;
+        try {
+            tReadResponse = client.pread(tPReadRequest);
+        } catch (TException e) {
+            throw new IOException(e);
+        }
+        if (tReadResponse.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) {
+            throw new IOException("Broker pread failed. fd=" + fd.toString() + ", broker=" + client
+                    + ", msg=" + tReadResponse.getOpStatus().getMessage());
+        }
+
+        if (tReadResponse.getOpStatus().getStatusCode() == TBrokerOperationStatusCode.END_OF_FILE) {
+            throw new EOFException();
+        }
+        return tReadResponse.getData();
+    }
+
+    public TBrokerFD open(String path) throws IOException {
+        String clientId = FrontendOptions.getLocalHostAddress() + ":" + Config.rpc_port;
+        TBrokerOpenReaderRequest tOpenReaderRequest = new TBrokerOpenReaderRequest(
+                TBrokerVersion.VERSION_ONE, path, 0, clientId, brokerDesc.getProperties());
+        TBrokerOpenReaderResponse tOpenReaderResponse = null;
+        try {
+            tOpenReaderResponse = client.openReader(tOpenReaderRequest);
+        } catch (TException e) {
+            throw new IOException(e);
+        }
+        if (tOpenReaderResponse.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) {
+            throw new IOException("Broker open reader failed. path=" + path + ", broker=" + address
+                    + ", msg=" + tOpenReaderResponse.getOpStatus().getMessage());
+        }
+        this.currentPos = 0;
+        return tOpenReaderResponse.getFd();
+    }
+
+    public void close(TBrokerFD fd) {
+        TBrokerCloseReaderRequest tCloseReaderRequest = new TBrokerCloseReaderRequest(
+                TBrokerVersion.VERSION_ONE, fd);
+        TBrokerOperationStatus tOperationStatus = null;
+        try {
+            tOperationStatus = client.closeReader(tCloseReaderRequest);
+        } catch (TException e) {
+            LOG.warn("Broker close reader failed. fd={}, address={}", fd.toString(), address, e);
+        }
+        if (tOperationStatus == null || tOperationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) {
+            LOG.warn("Broker close reader failed. fd={}, address={}, error={}", fd.toString(), address,
+                    tOperationStatus.getMessage());
+        }
+    }
+
+    public long getFileLength(String path) throws IOException {
+        TBrokerListPathRequest request = new TBrokerListPathRequest(
+                TBrokerVersion.VERSION_ONE, path, false, brokerDesc.getProperties());
+        TBrokerListResponse tBrokerListResponse = null;
+        try {
+            tBrokerListResponse = client.listPath(request);
+        } catch (TException e) {
+            throw new IOException(e);
+        }
+        if (tBrokerListResponse.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) {
+            throw new IOException("Broker list path failed. path=" + path
+                    + ",broker=" + address + ",msg=" + tBrokerListResponse.getOpStatus().getMessage());
+        }
+
+        if (tBrokerListResponse.files.size() != 1) {
+            throw new IOException("Match " + tBrokerListResponse.files.size() + " files. Expected: 1");
+        }
+
+        TBrokerFileStatus fileStatus = tBrokerListResponse.files.get(0);
+        if (fileStatus.isDir) {
+            throw new IOException("Meet dir. Expect file");
+        }
+
+        return fileStatus.size;
+    }
+
+    public static class EOFException extends Exception {
+        public EOFException() {
+            super();
+        }
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
index cc73098..5295f96 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
@@ -245,7 +245,7 @@ public class BrokerUtil {
      * @return byte[]
      * @throws UserException if broker op failed or not only one file
      */
-    public static byte[] readFile(String path, BrokerDesc brokerDesc) throws UserException {
+    public static byte[] readFile(String path, BrokerDesc brokerDesc, long maxReadLen) throws UserException {
         TNetworkAddress address = getAddress(brokerDesc);
         TPaloBrokerService.Client client = borrowClient(address);
         boolean failed = true;
@@ -292,8 +292,12 @@ public class BrokerUtil {
             fd = tOpenReaderResponse.getFd();
 
             // read
+            long readLen = fileSize;
+            if (maxReadLen > 0 && maxReadLen < fileSize) {
+                readLen = maxReadLen;
+            }
             TBrokerPReadRequest tPReadRequest = new TBrokerPReadRequest(
-                    TBrokerVersion.VERSION_ONE, fd, 0, fileSize);
+                    TBrokerVersion.VERSION_ONE, fd, 0, readLen);
             TBrokerReadResponse tReadResponse = null;
             try {
                 tReadResponse = client.pread(tPReadRequest);
@@ -506,7 +510,7 @@ public class BrokerUtil {
         return new TNetworkAddress(broker.ip, broker.port);
     }
 
-    private static TPaloBrokerService.Client borrowClient(TNetworkAddress address) throws UserException {
+    public static TPaloBrokerService.Client borrowClient(TNetworkAddress address) throws UserException {
         TPaloBrokerService.Client client = null;
         try {
             client = ClientPool.brokerPool.borrowObject(address);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/ImportAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/ImportAction.java
new file mode 100644
index 0000000..267ae98
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/ImportAction.java
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.httpv2.restv2;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.parquet.ParquetReader;
+import org.apache.doris.common.util.BrokerUtil;
+import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
+import org.apache.doris.thrift.TBrokerFileStatus;
+
+import com.google.common.collect.Lists;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import lombok.Getter;
+import lombok.Setter;
+
+@RestController
+@RequestMapping("/rest/v2")
+public class ImportAction {
+
+    private static final Logger LOG = LogManager.getLogger(ImportAction.class);
+
+    private static final long MAX_READ_LEN_BYTES = 1024 * 1024; // 1MB
+
+    private static final String FORMAT_CSV = "CSV";
+    private static final String FORMAT_PARQUET = "PARQUET";
+    private static final String FORMAT_ORC = "ORC";
+
+    private static final int MAX_SAMPLE_LINE = 50;
+
+    /**
+     * Request body:
+     * {
+     *  "fileInfo": {
+     *      "columnSeparator": ",",
+     *      "fileUrl": "hdfs://127.0.0.1:50070/file/test/text*",
+     *      "format": "TXT" // TXT or PARQUET
+     *  },
+     *  "connectInfo": {  // Optional
+     *      "brokerName" : "my_broker",
+     *      "brokerProps" : {
+     *          "username" : "yyy",
+     *          "password" : "xxx"
+     *      }
+     *  }
+     * }
+     */
+    @RequestMapping(path = "/api/import/file_review", method = RequestMethod.POST)
+    public Object fileReview(@RequestBody FileReviewRequestVo body, HttpServletRequest request, HttpServletResponse response) {
+        FileInfo fileInfo = body.getFileInfo();
+        ConnectInfo connectInfo = body.getConnectInfo();
+        BrokerDesc brokerDesc = new BrokerDesc(connectInfo.getBrokerName(), connectInfo.getBrokerProps());
+
+        List<TBrokerFileStatus> fileStatuses = Lists.newArrayList();
+        try {
+            // get file status
+            BrokerUtil.parseFile(fileInfo.getFileUrl(), brokerDesc, fileStatuses);
+            // create response
+            FileReviewResponseVo reviewResponseVo = createFileReviewResponse(brokerDesc, fileInfo, fileStatuses);
+            return ResponseEntityBuilder.ok(reviewResponseVo);
+        } catch (UserException e) {
+            return ResponseEntityBuilder.okWithCommonError(e.getMessage());
+        }
+    }
+
+    private FileReviewResponseVo createFileReviewResponse(BrokerDesc brokerDesc, FileInfo fileInfo,
+                                                          List<TBrokerFileStatus> fileStatuses) throws UserException {
+        FileReviewResponseVo responseVo = new FileReviewResponseVo();
+        // set file review statistic
+        FileReviewStatistic statistic = new FileReviewStatistic();
+        statistic.setFileNumber(fileStatuses.size());
+        long totalFileSize = 0;
+        for (TBrokerFileStatus fStatus : fileStatuses) {
+            if (fStatus.isDir) {
+                throw new UserException("Not all matched paths are files: " + fStatus.path);
+            }
+            totalFileSize += fStatus.size;
+        }
+        statistic.setFileSize(totalFileSize);
+        responseVo.setReviewStatistic(statistic);
+
+        if (fileStatuses.isEmpty()) {
+            return responseVo;
+        }
+
+        // Begin to preview first file.
+        TBrokerFileStatus sampleFile = fileStatuses.get(0);
+        FileSample fileSample = new FileSample();
+        fileSample.setSampleFileName(sampleFile.path);
+
+        if (fileInfo.format.equalsIgnoreCase(FORMAT_CSV)) {
+            byte[] fileContentBytes = BrokerUtil.readFile(sampleFile.path, brokerDesc, MAX_READ_LEN_BYTES);
+            parseContent(fileInfo.columnSeparator, "\n", fileContentBytes, fileSample);
+        } else if (fileInfo.format.equalsIgnoreCase(FORMAT_PARQUET)) {
+            try {
+                ParquetReader parquetReader = ParquetReader.create(sampleFile.path, brokerDesc);
+                parseParquet(parquetReader, fileSample);
+            } catch (IOException e) {
+                LOG.warn("failed to get sample data of parquet file: {}", sampleFile.path, e);
+                throw new UserException("failed to get sample data of parquet file. " + e.getMessage());
+            }
+        } else {
+            throw new UserException("Only support CSV or PARQUET file format");
+        }
+
+        responseVo.setFileSample(fileSample);
+        return responseVo;
+    }
+
+    private void parseContent(String columnSeparator, String lineDelimiter, byte[] fileContentBytes,
+                                            FileSample fileSample) {
+        List<List<String>> sampleLines = Lists.newArrayList();
+        int maxColSize = 0;
+        String content = new String(fileContentBytes);
+        String[] lines = content.split(lineDelimiter);
+        for (String line : lines) {
+            if (sampleLines.size() >= MAX_SAMPLE_LINE) {
+                break;
+            }
+            String[] cols = line.split(columnSeparator);
+            List<String> row = Lists.newArrayList(cols);
+            sampleLines.add(row);
+            maxColSize = Math.max(maxColSize, row.size());
+        }
+
+        fileSample.setFileLineNumber(sampleLines.size());
+        fileSample.setMaxColumnSize(maxColSize);
+        fileSample.setSampleFileLines(sampleLines);
+        return;
+    }
+
+    private void parseParquet(ParquetReader reader, FileSample fileSample) throws IOException {
+        fileSample.setColNames(reader.getSchema(false));
+        fileSample.setMaxColumnSize(fileSample.colNames.size());
+        fileSample.setSampleFileLines(reader.getLines(MAX_SAMPLE_LINE));
+        fileSample.setFileLineNumber(fileSample.sampleFileLines.size());
+    }
+
+    @Getter
+    @Setter
+    public static class FileReviewRequestVo {
+        private FileInfo fileInfo;
+        private ConnectInfo connectInfo;
+    }
+
+    @Getter
+    @Setter
+    public static class FileInfo {
+        private String columnSeparator;
+        private String fileUrl;
+        private String format;
+    }
+
+    @Getter
+    @Setter
+    public static class ConnectInfo {
+        private String brokerName;
+        private Map<String, String> brokerProps;
+    }
+
+    @Getter
+    @Setter
+    public static class FileReviewResponseVo {
+        private FileReviewStatistic reviewStatistic;
+        private FileSample fileSample;
+    }
+
+    @Getter
+    @Setter
+    public static class FileReviewStatistic {
+        private int fileNumber;
+        private long fileSize;
+    }
+
+    @Getter
+    @Setter
+    public static class FileSample {
+        private String sampleFileName;
+        private int fileLineNumber;
+        private int maxColumnSize;
+        private List<String> colNames;
+        private List<List<String>> sampleFileLines;
+    }
+
+
+    public static void main(String[] args) {
+        ImportAction importAction = new ImportAction();
+        String str = "1,2,3\n4,5,6\n,7,8,9,中国";
+        byte[] fileContentBytes = str.getBytes();
+        System.out.println(fileContentBytes.length);
+        String newStr = new String(fileContentBytes, 0,fileContentBytes.length - 2);
+        System.out.println(newStr);
+
+        FileSample fileSample = new FileSample();
+        importAction.parseContent(",", "\n", newStr.getBytes(), fileSample);
+        System.out.println(fileSample.sampleFileLines);
+    }
+
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/MetaInfoActionV2.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/MetaInfoActionV2.java
new file mode 100644
index 0000000..c3f64e2
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/MetaInfoActionV2.java
@@ -0,0 +1,392 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.httpv2.restv2;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
+import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
+import org.apache.doris.httpv2.exception.BadRequestException;
+import org.apache.doris.httpv2.rest.RestBaseController;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.system.SystemInfoService;
+
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * And meta info like databases, tables and schema
+ */
+@RestController
+@RequestMapping("/rest/v2")
+public class MetaInfoActionV2 extends RestBaseController {
+
+    private static final String NAMESPACES = "namespaces";
+    private static final String DATABASES = "databases";
+    private static final String TABLES = "tables";
+    private static final String PARAM_LIMIT = "limit";
+    private static final String PARAM_OFFSET = "offset";
+    private static final String PARAM_WITH_MV = "with_mv";
+
+    /**
+     * Get all databases
+     * {
+     * 	"msg": "success",
+     * 	"code": 0,
+     * 	"data": [
+     * 		"default_cluster:db1",
+     * 		"default_cluster:doris_audit_db__",
+     * 		"default_cluster:information_schema"
+     * 	],
+     * 	"count": 0
+     * }
+     */
+    @RequestMapping(path = "/api/meta/" + NAMESPACES + "/{" + NS_KEY + "}/" + DATABASES,
+            method = {RequestMethod.GET})
+    public Object getAllDatabases(
+            @PathVariable(value = NS_KEY) String ns,
+            HttpServletRequest request, HttpServletResponse response) {
+        checkWithCookie(request, response, false);
+
+        if (!ns.equalsIgnoreCase(SystemInfoService.DEFAULT_CLUSTER)) {
+            return ResponseEntityBuilder.badRequest("Only support 'default_cluster' now");
+        }
+
+        // 1. get all database with privilege
+        List<String> dbNames = null;
+        try {
+            dbNames = Catalog.getCurrentCatalog().getClusterDbNames(ns);
+        } catch (AnalysisException e) {
+            return ResponseEntityBuilder.okWithCommonError("namespace does not exist: " + ns);
+        }
+        List<String> dbNameSet = Lists.newArrayList();
+        for (String fullName : dbNames) {
+            final String db = ClusterNamespace.getNameFromFullName(fullName);
+            if (!Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), fullName,
+                    PrivPredicate.SHOW)) {
+                continue;
+            }
+            dbNameSet.add(db);
+        }
+
+        Collections.sort(dbNames);
+
+        // handle limit offset
+        Pair<Integer, Integer> fromToIndex = getFromToIndex(request, dbNames.size());
+        return ResponseEntityBuilder.ok(dbNames.subList(fromToIndex.first, fromToIndex.second));
+    }
+
+    /** Get all tables of a database
+     * {
+     * 	"msg": "success",
+     * 	"code": 0,
+     * 	"data": [
+     * 		"tbl1",
+     * 		"tbl2"
+     * 	],
+     * 	"count": 0
+     * }
+     */
+
+    @RequestMapping(path = "/api/meta/" + NAMESPACES + "/{" + NS_KEY + "}/" + DATABASES + "/{" + DB_KEY + "}/" + TABLES,
+            method = {RequestMethod.GET})
+    public Object getTables(
+            @PathVariable(value = NS_KEY) String ns, @PathVariable(value = DB_KEY) String dbName,
+            HttpServletRequest request, HttpServletResponse response) {
+        checkWithCookie(request, response, false);
+
+        if (!ns.equalsIgnoreCase(SystemInfoService.DEFAULT_CLUSTER)) {
+            return ResponseEntityBuilder.badRequest("Only support 'default_cluster' now");
+        }
+
+
+        String fullDbName = getFullDbName(dbName);
+        Database db = Catalog.getCurrentCatalog().getDb(fullDbName);
+        if (db == null) {
+            return ResponseEntityBuilder.okWithCommonError("Database does not exist: " + fullDbName);
+        }
+
+        List<String> tblNames = Lists.newArrayList();
+        db.readLock();
+        try {
+            for (Table tbl : db.getTables()) {
+                if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), fullDbName, tbl.getName(),
+                        PrivPredicate.SHOW)) {
+                    continue;
+                }
+                tblNames.add(tbl.getName());
+            }
+        } finally {
+            db.readUnlock();
+        }
+
+        Collections.sort(tblNames);
+
+        // handle limit offset
+        Pair<Integer, Integer> fromToIndex = getFromToIndex(request, tblNames.size());
+        return ResponseEntityBuilder.ok(tblNames.subList(fromToIndex.first, fromToIndex.second));
+    }
+
+    /**
+     * {
+     *     "msg": "success",
+     *     "code": 0,
+     *     "data": {
+     *         "engineType": "OLAP",
+     *         "schemaInfo": {
+     *             "schemaMap": {
+     *                 "tbl1": {
+     *                     "schema": {
+     *                         "field": "k2",
+     *                         "type": "INT",
+     *                         "isNull": "true",
+     *                         "defaultVal": null,
+     *                         "key": "true",
+     *                         "aggrType": "None",
+     *                         "comment": ""
+     *                     },
+     *                     "keyType": "DUP_KEYS",
+     *                     "baseIndex": true
+     *                 }
+     *             }
+     *         }
+     *     },
+     *     "count": 0
+     * }
+     */
+    @RequestMapping(path = "/api/meta/" + NAMESPACES + "/{" + NS_KEY + "}/" + DATABASES + "/{" + DB_KEY + "}/" + TABLES
+            + "/{" + TABLE_KEY + "}/schema",
+            method = {RequestMethod.GET})
+    public Object getTableSchema(
+            @PathVariable(value = NS_KEY) String ns, @PathVariable(value = DB_KEY) String dbName,
+            @PathVariable(value = TABLE_KEY) String tblName,
+            HttpServletRequest request, HttpServletResponse response) throws UserException {
+        checkWithCookie(request, response, false);
+
+        if (!ns.equalsIgnoreCase(SystemInfoService.DEFAULT_CLUSTER)) {
+            return ResponseEntityBuilder.badRequest("Only support 'default_cluster' now");
+        }
+
+        String fullDbName = getFullDbName(dbName);
+        checkTblAuth(ConnectContext.get().getCurrentUserIdentity(), fullDbName, tblName, PrivPredicate.SHOW);
+
+        Database db = Catalog.getCurrentCatalog().getDb(fullDbName);
+        if (db == null) {
+            return ResponseEntityBuilder.okWithCommonError("Database does not exist: " + fullDbName);
+        }
+
+        String withMvPara = request.getParameter(PARAM_WITH_MV);
+        boolean withMv = Strings.isNullOrEmpty(withMvPara) ? false : withMvPara.equals("1");
+
+
+        TableSchemaInfo tableSchemaInfo = new TableSchemaInfo();
+        db.readLock();
+        try {
+            Table tbl = db.getTable(tblName);
+            if (tbl == null) {
+                return ResponseEntityBuilder.okWithCommonError("Table does not exist: " + tblName);
+            }
+
+            tableSchemaInfo.setEngineType(tbl.getType().toString());
+            SchemaInfo schemaInfo = generateSchemaInfo(tbl, withMv);
+            tableSchemaInfo.setSchemaInfo(schemaInfo);
+            return ResponseEntityBuilder.ok(tableSchemaInfo);
+        } finally {
+            db.readUnlock();
+        }
+    }
+
+    private SchemaInfo generateSchemaInfo(Table tbl, boolean withMv) {
+        SchemaInfo schemaInfo = new SchemaInfo();
+        Map<String, TableSchema> schemaMap = Maps.newHashMap();
+        if (tbl.getType() == Table.TableType.OLAP) {
+            OlapTable olapTable = (OlapTable) tbl;
+            long baseIndexId = olapTable.getBaseIndexId();
+            TableSchema baseTableSchema = new TableSchema();
+            baseTableSchema.setBaseIndex(true);
+            baseTableSchema.setKeyType(olapTable.getKeysTypeByIndexId(baseIndexId).name());
+            Schema baseSchema = generateSchame(olapTable.getSchemaByIndexId(baseIndexId));
+            baseTableSchema.setSchema(baseSchema);
+            schemaMap.put(olapTable.getIndexNameById(baseIndexId), baseTableSchema);
+
+            if (withMv) {
+                for (long indexId : olapTable.getIndexIdListExceptBaseIndex()) {
+                    TableSchema tableSchema = new TableSchema();
+                    tableSchema.setBaseIndex(false);
+                    tableSchema.setKeyType(olapTable.getKeysTypeByIndexId(indexId).name());
+                    Schema schema = generateSchame(olapTable.getSchemaByIndexId(indexId));
+                    tableSchema.setSchema(schema);
+                    schemaMap.put(olapTable.getIndexNameById(indexId), tableSchema);
+                }
+            }
+
+        } else {
+            TableSchema tableSchema = new TableSchema();
+            tableSchema.setBaseIndex(false);
+            Schema schema = generateSchame(tbl.getBaseSchema());
+            tableSchema.setSchema(schema);
+            schemaMap.put(tbl.getName(), tableSchema);
+            schemaInfo.setSchemaMap(schemaMap);
+        }
+        schemaInfo.setSchemaMap(schemaMap);
+        return schemaInfo;
+    }
+
+    private Schema generateSchame(List<Column> columns) {
+        Schema schema = new Schema();
+        for (Column column : columns) {
+            schema.setField(column.getName());
+            schema.setType(column.getType().toString());
+            schema.setIsNull(String.valueOf(column.isAllowNull()));
+            schema.setDefaultVal(column.getDefaultValue());
+            schema.setKey(String.valueOf(column.isKey()));
+            schema.setAggrType(column.getAggregationType() == null ?
+                    "None" : column.getAggregationType().toString());
+            schema.setComment(column.getComment());
+        }
+        return schema;
+    }
+
+    private void generateResult(Table tbl, boolean isBaseIndex,
+                                Map<String, Map<String, Object>> result) throws UserException {
+        Map<String, Object> propMap = result.get(tbl.getName());
+        if (propMap == null) {
+            propMap = Maps.newHashMap();
+            result.put(tbl.getName(), propMap);
+        }
+
+        propMap.put("isBase", isBaseIndex);
+        propMap.put("tableType", tbl.getEngine());
+        if (tbl.getType() == Table.TableType.OLAP) {
+            propMap.put("keyType", ((OlapTable)tbl).getKeysType());
+        }
+        propMap.put("schema", generateSchema(tbl.getBaseSchema()));
+    }
+
+    List<Map<String, String>> generateSchema(List<Column> columns) throws UserException {
+        List<Map<String, String>> schema = Lists.newArrayList();
+        for (Column column : columns) {
+            Map<String, String> colSchema = Maps.newHashMap();
+            colSchema.put("Field", column.getName());
+            colSchema.put("Type", column.getType().toString());
+            colSchema.put("Null", String.valueOf(column.isAllowNull()));
+            colSchema.put("Default", column.getDefaultValue());
+            colSchema.put("Key", String.valueOf(column.isKey()));
+            colSchema.put("AggType", column.getAggregationType().toString());
+            colSchema.put("Comment", column.getComment());
+            schema.add(colSchema);
+        }
+        return schema;
+    }
+
+    private String convertIfNull(String val) {
+        return val.equals(FeConstants.null_string) ? null : val;
+    }
+
+    // get limit and offset from query parameter
+    // and return fromIndex and toIndex of a list
+    private Pair<Integer, Integer> getFromToIndex(HttpServletRequest request, int maxNum) {
+        String limitStr = request.getParameter(PARAM_LIMIT);
+        String offsetStr = request.getParameter(PARAM_OFFSET);
+
+        int offset = 0;
+        int limit = Integer.MAX_VALUE;
+        if (Strings.isNullOrEmpty(limitStr)) {
+            // limit not set
+            if (!Strings.isNullOrEmpty(offsetStr)) {
+                throw new BadRequestException("Param offset should be set with param limit");
+            }
+        } else {
+            // limit is set
+            limit = Integer.valueOf(limitStr);
+            if (limit < 0) {
+                throw new BadRequestException("Param limit should >= 0");
+            }
+
+            offset = 0;
+            if (!Strings.isNullOrEmpty(offsetStr)) {
+                offset = Integer.valueOf(offsetStr);
+                if (offset < 0) {
+                    throw new BadRequestException("Param offset should >= 0");
+                }
+            }
+        }
+
+        if (maxNum <= 0) {
+            return Pair.create(0, 0);
+        }
+        return Pair.create(Math.min(offset, maxNum - 1), Math.min(limit + offset, maxNum));
+    }
+
+    @Getter
+    @Setter
+    public static class TableSchemaInfo {
+        private String engineType;
+        private SchemaInfo schemaInfo;
+    }
+
+    @Getter
+    @Setter
+    public static class SchemaInfo {
+        // tbl(index name) -> schema
+        private Map<String, TableSchema> schemaMap;
+    }
+
+    @Getter
+    @Setter
+    public static class TableSchema {
+        private Schema schema;
+        private boolean isBaseIndex;
+        private String keyType;
+    }
+
+    @Getter
+    @Setter
+    public static class Schema {
+        private String field;
+        private String type;
+        private String isNull;
+        private String defaultVal;
+        private String key;
+        private String aggrType;
+        private String comment;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/StatisticAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/StatisticAction.java
new file mode 100644
index 0000000..6fff16f
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/StatisticAction.java
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.httpv2.restv2;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
+import org.apache.doris.httpv2.rest.RestBaseController;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+import com.google.common.collect.Maps;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.util.List;
+import java.util.Map;
+
+@RestController
+@RequestMapping("/rest/v2")
+public class StatisticAction extends RestBaseController {
+
+    private static final Logger LOG = LogManager.getLogger(StatisticAction.class);
+
+    @RequestMapping(path = "/api/cluster_overview", method = RequestMethod.GET)
+    public Object clusterOverview(HttpServletRequest request, HttpServletResponse response) {
+        Map<String, Object> resultMap = Maps.newHashMap();
+        Catalog catalog = Catalog.getCurrentCatalog();
+        SystemInfoService infoService = Catalog.getCurrentSystemInfo();
+
+        resultMap.put("dbCount", catalog.getDbIds().size());
+        resultMap.put("tblCount", getTblCount(catalog));
+        resultMap.put("diskOccupancy", getDiskOccupancy(infoService));
+        resultMap.put("beCount", infoService.getClusterBackendIds(SystemInfoService.DEFAULT_CLUSTER).size());
+        resultMap.put("feCount", catalog.getFrontends(null).size());
+        resultMap.put("remainDisk", getRemainDisk(infoService));
+
+        return  ResponseEntityBuilder.ok(resultMap);
+    }
+
+    private int getTblCount(Catalog catalog) {
+        int tblCount = 0;
+        List<Long> dbIds = catalog.getDbIds();
+        for (long dbId : dbIds) {
+            Database db = catalog.getDb(dbId);
+            tblCount += db.getTables().size();
+        }
+        return tblCount;
+    }
+
+    private long getDiskOccupancy(SystemInfoService infoService) {
+        long diskOccupancy = 0;
+        List<Backend> backends = infoService.getClusterBackends(SystemInfoService.DEFAULT_CLUSTER);
+        for (Backend be : backends) {
+            diskOccupancy += be.getDataUsedCapacityB();
+        }
+        return diskOccupancy;
+    }
+
+    private long getRemainDisk(SystemInfoService infoService) {
+        long remainDisk = 0;
+        List<Backend> backends = infoService.getClusterBackends(SystemInfoService.DEFAULT_CLUSTER);
+        for (Backend be : backends) {
+            remainDisk += be.getAvailableCapacityB();
+        }
+        return remainDisk;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
index 21f6431..f26793a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
@@ -33,12 +33,6 @@ import org.apache.doris.load.loadv2.etl.EtlJobConfig;
 import org.apache.doris.load.loadv2.etl.SparkEtlJob;
 import org.apache.doris.thrift.TBrokerFileStatus;
 import org.apache.doris.thrift.TEtlState;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.gson.JsonSyntaxException;
 
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -47,6 +41,13 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.spark.launcher.SparkLauncher;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
@@ -245,7 +246,7 @@ public class SparkEtlJobHandler {
             // get dpp result
             String dppResultFilePath = EtlJobConfig.getDppResultFilePath(etlOutputPath);
             try {
-                byte[] data = BrokerUtil.readFile(dppResultFilePath, brokerDesc);
+                byte[] data = BrokerUtil.readFile(dppResultFilePath, brokerDesc, 0);
                 String dppResultStr = new String(data, "UTF-8");
                 DppResult dppResult = new Gson().fromJson(dppResultStr, DppResult.class);
                 if (dppResult != null) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/parquet/ParquetReaderTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/parquet/ParquetReaderTest.java
new file mode 100644
index 0000000..6c0896c
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/parquet/ParquetReaderTest.java
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.parquet;
+
+import org.apache.doris.analysis.BrokerDesc;
+
+import com.google.common.collect.Maps;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class ParquetReaderTest {
+    private static final Logger LOG = LogManager.getLogger(ParquetReaderTest.class);
+
+    // you need to set
+    //  localfile, remote file
+    //  ak, sk, broker desc
+    // before running this test
+    @Ignore
+    @Test
+    public void testWrongFormat() {
+        try {
+            // local
+            String file = "/path/to/local/all_type_none.parquet";
+            ParquetReader reader = ParquetReader.create(file);
+            LOG.info(reader.getSchema(false));
+
+            // remote
+            String file2 = "bos://cmy-repe/all_type_none.parquet";
+            Map<String, String> properties = Maps.newHashMap();
+            properties.put("bos_endpoint", "http://bj.bcebos.com");
+            properties.put("bos_accesskey", "1");
+            properties.put("bos_secret_accesskey", "2");
+            BrokerDesc brokerDesc = new BrokerDesc("dummy", properties);
+
+            ParquetReader reader2 = ParquetReader.create(file2, brokerDesc,"127.0.0.1", 8118);
+            LOG.info(reader2.getSchema(false));
+        } catch (Exception e) {
+            LOG.info("error: ", e);
+        }
+    }
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java
index 4be5d0f..af7d327 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java
@@ -17,9 +17,6 @@
 
 package org.apache.doris.common.util;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
 import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.catalog.BrokerMgr;
 import org.apache.doris.catalog.Catalog;
@@ -48,11 +45,7 @@ import org.apache.doris.thrift.TPaloBrokerService;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import mockit.Expectations;
-import mockit.Injectable;
-import mockit.Mock;
-import mockit.Mocked;
-import mockit.MockUp;
+
 import org.apache.thrift.TException;
 import org.junit.Assert;
 import org.junit.Test;
@@ -61,6 +54,14 @@ import java.io.UnsupportedEncodingException;
 import java.util.Collections;
 import java.util.List;
 
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
 public class BrokerUtilTest {
 
     @Test
@@ -218,7 +219,7 @@ public class BrokerUtilTest {
         };
 
         BrokerDesc brokerDesc = new BrokerDesc("broker0", Maps.newHashMap());
-        byte[] data = BrokerUtil.readFile(filePath, brokerDesc);
+        byte[] data = BrokerUtil.readFile(filePath, brokerDesc, 0);
         String readStr = new String(data, "UTF-8");
         Assert.assertEquals(dppResultStr, readStr);
     }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
index 73df628..a462630 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
@@ -46,14 +46,15 @@ import org.apache.doris.thrift.TBrokerOperationStatusCode;
 import org.apache.doris.thrift.TEtlState;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPaloBrokerService;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 
 import org.apache.spark.launcher.SparkLauncher;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -233,7 +234,7 @@ public class SparkEtlJobHandlerTest {
                 minTimes = 0;
                 result = commandResult;
 
-                BrokerUtil.readFile(anyString, (BrokerDesc) any);
+                BrokerUtil.readFile(anyString, (BrokerDesc) any, anyLong);
                 result = "{'normal_rows': 10, 'abnormal_rows': 0, 'failed_reason': 'etl job failed'}";
             }
         };
diff --git a/fe/pom.xml b/fe/pom.xml
index 0665f65..c0c3f50 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -672,6 +672,14 @@ under the License.
                 <version>1.2</version>
             </dependency>
 
+            <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
+            <dependency>
+                <groupId>org.projectlombok</groupId>
+                <artifactId>lombok</artifactId>
+                <version>1.18.16</version>
+                <scope>provided</scope>
+            </dependency>
+
         </dependencies>
     </dependencyManagement>
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org