You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/07/18 14:14:52 UTC
[incubator-doris] branch master updated: [HttpV2] Add more httpv2
APIs (#6210)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new a4b1622 [HttpV2] Add more httpv2 APIs (#6210)
a4b1622 is described below
commit a4b1622cebb10e9fbce0c9e5b4c4598f0bc08e50
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Sun Jul 18 22:14:42 2021 +0800
[HttpV2] Add more httpv2 APIs (#6210)
1. /api/cluster_overview to view some statistic info of the cluster
2. /api/meta/ to view the database/table schema
3. /api/import/file_review to review the file content with format CSV or PARQUET.
---
.../src/main/java/org/apache/doris/PaloFe.java | 1 +
.../doris/common/parquet/BrokerInputFile.java | 272 ++++++++++++++
.../doris/common/parquet/LocalInputFile.java | 197 +++++++++++
.../apache/doris/common/parquet/ParquetReader.java | 136 +++++++
.../org/apache/doris/common/util/BrokerReader.java | 166 +++++++++
.../org/apache/doris/common/util/BrokerUtil.java | 10 +-
.../apache/doris/httpv2/restv2/ImportAction.java | 228 ++++++++++++
.../doris/httpv2/restv2/MetaInfoActionV2.java | 392 +++++++++++++++++++++
.../doris/httpv2/restv2/StatisticAction.java | 89 +++++
.../doris/load/loadv2/SparkEtlJobHandler.java | 15 +-
.../doris/common/parquet/ParquetReaderTest.java | 61 ++++
.../apache/doris/common/util/BrokerUtilTest.java | 19 +-
.../doris/load/loadv2/SparkEtlJobHandlerTest.java | 7 +-
fe/pom.xml | 8 +
14 files changed, 1579 insertions(+), 22 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java b/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java
index ab45c7a..9d4cc9d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/PaloFe.java
@@ -313,3 +313,4 @@ public class PaloFe {
}
}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/parquet/BrokerInputFile.java b/fe/fe-core/src/main/java/org/apache/doris/common/parquet/BrokerInputFile.java
new file mode 100644
index 0000000..f2aa3d6
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/parquet/BrokerInputFile.java
@@ -0,0 +1,272 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.parquet;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.common.util.BrokerReader;
+import org.apache.doris.thrift.TBrokerFD;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.SeekableInputStream;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class BrokerInputFile implements InputFile {
+ private static final Logger LOG = LogManager.getLogger(BrokerInputFile.class);
+
+ private static final int COPY_BUFFER_SIZE = 1024 * 1024;
+
+ private BrokerDesc brokerDesc;
+ private String filePath;
+ private long fileLength = 0;
+ private BrokerReader reader;
+ private TBrokerFD fd;
+
+ public static BrokerInputFile create(String filePath, BrokerDesc brokerDesc) throws IOException {
+ BrokerInputFile inputFile = new BrokerInputFile(filePath, brokerDesc);
+ inputFile.init();
+ return inputFile;
+ }
+
+ // For test only. ip port is broker ip port
+ public static BrokerInputFile create(String filePath, BrokerDesc brokerDesc, String ip, int port) throws IOException {
+ BrokerInputFile inputFile = new BrokerInputFile(filePath, brokerDesc);
+ inputFile.init(ip, port);
+ return inputFile;
+ }
+
+ private BrokerInputFile(String filePath, BrokerDesc brokerDesc) {
+ this.filePath = filePath;
+ this.brokerDesc = brokerDesc;
+ }
+
+ private void init() throws IOException {
+ this.reader = BrokerReader.create(this.brokerDesc);
+ this.fileLength = this.reader.getFileLength(filePath);
+ this.fd = this.reader.open(filePath);
+ }
+
+ // For test only. ip port is broker ip port
+ private void init(String ip, int port) throws IOException {
+ this.reader = BrokerReader.create(this.brokerDesc, ip, port);
+ this.fileLength = this.reader.getFileLength(filePath);
+ this.fd = this.reader.open(filePath);
+ }
+
+ @Override
+ public long getLength() throws IOException {
+ return fileLength;
+ }
+
+ @Override
+ public SeekableInputStream newStream() throws IOException {
+ return new SeekableInputStream() {
+ private final byte[] tmpBuf = new byte[COPY_BUFFER_SIZE];
+ private long currentPos = 0;
+ private long markPos = 0;
+
+ private long bufferOffset = 0;
+ private long bufferLimit = 0;
+
+ @Override
+ public int read() throws IOException {
+ try {
+ if (currentPos < bufferOffset || currentPos > bufferLimit || bufferOffset >= bufferLimit) {
+ bufferOffset = currentPos;
+ fill();
+ }
+ if (currentPos > bufferLimit) {
+ LOG.warn("current pos {} is larger than buffer limit {}. should not happen.", currentPos, bufferLimit);
+ return -1;
+ }
+
+ int pos = (int) (currentPos - bufferOffset);
+ int res = Byte.toUnsignedInt(tmpBuf[pos]);
+ ++currentPos;
+ return res;
+ } catch (BrokerReader.EOFException e) {
+ return -1;
+ }
+ }
+
+ private void fill() throws IOException, BrokerReader.EOFException {
+ byte[] data = reader.pread(fd, bufferOffset, COPY_BUFFER_SIZE);
+ System.arraycopy(data, 0, tmpBuf, 0, data.length);
+ bufferLimit = bufferOffset + data.length;
+ }
+
+ @SuppressWarnings("NullableProblems")
+ @Override
+ public int read(byte[] b) throws IOException {
+ try {
+ byte[] data = reader.pread(fd, currentPos, b.length);
+ System.arraycopy(data, 0, b, 0, data.length);
+ currentPos += data.length;
+ return data.length;
+ } catch (BrokerReader.EOFException e) {
+ return -1;
+ }
+ }
+
+ @SuppressWarnings("NullableProblems")
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ try {
+ if (currentPos < bufferOffset || currentPos > bufferLimit || currentPos + len > bufferLimit) {
+ if (len > COPY_BUFFER_SIZE) {
+ // the data to be read is larger then max size of buffer.
+ // read it directly.
+ byte[] data = reader.pread(fd, currentPos, len);
+ System.arraycopy(data, 0, b, off, data.length);
+ currentPos += data.length;
+ return data.length;
+ }
+ // fill the buffer first
+ bufferOffset = currentPos;
+ fill();
+ }
+
+ if (currentPos > bufferLimit) {
+ LOG.warn("current pos {} is larger than buffer limit {}. should not happen.", currentPos, bufferLimit);
+ return -1;
+ }
+
+ int start = (int) (currentPos - bufferOffset);
+ int readLen = Math.min(len, (int) (bufferLimit - bufferOffset));
+ System.arraycopy(tmpBuf, start, b, off, readLen);
+ currentPos += readLen;
+ return readLen;
+ } catch (BrokerReader.EOFException e) {
+ return -1;
+ }
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ final long left = fileLength - currentPos;
+ long min = Math.min(n, left);
+ currentPos += min;
+ return min;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close(fd);
+ }
+
+ @SuppressWarnings({"unchecked", "unused", "UnusedReturnValue"})
+ private <T extends Throwable, R> R uncheckedExceptionThrow(Throwable t) throws T {
+ throw (T) t;
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ markPos = currentPos;
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ currentPos = markPos;
+ }
+
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return currentPos;
+ }
+
+ @Override
+ public void seek(long l) throws IOException {
+ currentPos = l;
+ }
+
+ @Override
+ public void readFully(byte[] bytes) throws IOException {
+ try {
+ byte[] data = reader.pread(fd, currentPos, bytes.length);
+ System.arraycopy(data, 0, bytes, 0, data.length);
+ currentPos += data.length;
+ if (data.length < bytes.length) {
+ throw new EOFException("Reach the end of file with " + (bytes.length - data.length)
+ + " bytes left to read");
+ }
+ } catch (BrokerReader.EOFException e) {
+ throw new EOFException("Reach the end of file");
+ }
+ }
+
+ @Override
+ public void readFully(byte[] bytes, int offset, int len) throws IOException {
+ try {
+ byte[] data = reader.pread(fd, currentPos, len);
+ System.arraycopy(data, 0, bytes, offset, data.length);
+ currentPos += data.length;
+ if (data.length < len) {
+ throw new EOFException("Reach the end of file with " + (len - data.length)
+ + " bytes left to read");
+ }
+ } catch (BrokerReader.EOFException e) {
+ throw new EOFException("Reach the end of file");
+ }
+ }
+
+ @Override
+ public int read(ByteBuffer byteBuffer) throws IOException {
+ try {
+ byte[] data = reader.pread(fd, currentPos, byteBuffer.remaining());
+ byteBuffer.put(data);
+ currentPos += data.length;
+ return data.length;
+ } catch (BrokerReader.EOFException e) {
+ return -1;
+ }
+ }
+
+ @Override
+ public void readFully(ByteBuffer byteBuffer) throws IOException {
+ long markCurPos = currentPos;
+ while (byteBuffer.remaining() > 0) {
+ try {
+ byte[] data = reader.pread(fd, currentPos, byteBuffer.remaining());
+ byteBuffer.put(data);
+ currentPos += data.length;
+ } catch (BrokerReader.EOFException e) {
+ if (byteBuffer.remaining() > 0) {
+ throw new EOFException("Reach the end of file with " + byteBuffer.remaining() + " bytes left to read. "
+ + "read len: " + (currentPos - markCurPos));
+ }
+ }
+ }
+ }
+ }; // end of new SeekableInputStream
+ } // end of newStream
+}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/parquet/LocalInputFile.java b/fe/fe-core/src/main/java/org/apache/doris/common/parquet/LocalInputFile.java
new file mode 100644
index 0000000..6a2cf92
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/parquet/LocalInputFile.java
@@ -0,0 +1,197 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.parquet;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.SeekableInputStream;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+
+/**
+ * Modified version of
+ * https://github.com/tideworks/arvo2parquet/blob/master/src/main/java/com/tideworks/data_load/io/InputFile.java
+ */
+public class LocalInputFile implements InputFile {
+ private static final Logger LOG = LogManager.getLogger(LocalInputFile.class);
+
+ private static final int COPY_BUFFER_SIZE = 8192;
+ private final RandomAccessFile input;
+
+ public LocalInputFile(File file) throws FileNotFoundException {
+ this.input = new RandomAccessFile(file, "r");
+ }
+
+ private static int readDirectBuffer(ByteBuffer byteBufr, byte[] tmpBuf, ByteBufReader rdr)
+ throws IOException {
+ // copy all the bytes that return immediately, stopping at the first
+ // read that doesn't return a full buffer.
+ int nextReadLength = Math.min(byteBufr.remaining(), tmpBuf.length);
+ int totalBytesRead = 0;
+ int bytesRead;
+
+ while ((bytesRead = rdr.read(tmpBuf, 0, nextReadLength)) == tmpBuf.length) {
+ byteBufr.put(tmpBuf);
+ totalBytesRead += bytesRead;
+ nextReadLength = Math.min(byteBufr.remaining(), tmpBuf.length);
+ }
+
+ if (bytesRead < 0) {
+ // return -1 if nothing was read
+ return totalBytesRead == 0 ? -1 : totalBytesRead;
+ } else {
+ // copy the last partial buffer
+ byteBufr.put(tmpBuf, 0, bytesRead);
+ totalBytesRead += bytesRead;
+ return totalBytesRead;
+ }
+ }
+
+ private static void readFullyDirectBuffer(ByteBuffer byteBufr, byte[] tmpBuf, ByteBufReader rdr)
+ throws IOException {
+ int nextReadLength = Math.min(byteBufr.remaining(), tmpBuf.length);
+ int bytesRead = 0;
+
+ while (nextReadLength > 0 && (bytesRead = rdr.read(tmpBuf, 0, nextReadLength)) >= 0) {
+ byteBufr.put(tmpBuf, 0, bytesRead);
+ nextReadLength = Math.min(byteBufr.remaining(), tmpBuf.length);
+ }
+
+ if (bytesRead < 0 && byteBufr.remaining() > 0) {
+ throw new EOFException(
+ "Reached the end of stream with " + byteBufr.remaining() + " bytes left to read");
+ }
+ }
+
+ @Override
+ public long getLength() throws IOException {
+ return input.length();
+ }
+
+ @Override
+ public SeekableInputStream newStream() {
+ return new SeekableInputStream() {
+ private final byte[] tmpBuf = new byte[COPY_BUFFER_SIZE];
+ private long markPos = 0;
+
+ @Override
+ public int read() throws IOException {
+ return input.read();
+ }
+
+ @SuppressWarnings("NullableProblems")
+ @Override
+ public int read(byte[] b) throws IOException {
+ return input.read(b);
+ }
+
+ @SuppressWarnings("NullableProblems")
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return input.read(b, off, len);
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ final long savPos = input.getFilePointer();
+ final long amtLeft = input.length() - savPos;
+ n = Math.min(n, amtLeft);
+ final long newPos = savPos + n;
+ input.seek(newPos);
+ final long curPos = input.getFilePointer();
+ return curPos - savPos;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ input.close();
+ }
+
+ @SuppressWarnings({"unchecked", "unused", "UnusedReturnValue"})
+ private <T extends Throwable, R> R uncheckedExceptionThrow(Throwable t) throws T {
+ throw (T) t;
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ try {
+ markPos = input.getFilePointer();
+ } catch (IOException e) {
+ uncheckedExceptionThrow(e);
+ }
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ input.seek(markPos);
+ }
+
+ @Override
+ public boolean markSupported() {
+ return true;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return input.getFilePointer();
+ }
+
+ @Override
+ public void seek(long l) throws IOException {
+ input.seek(l);
+ }
+
+ @Override
+ public void readFully(byte[] bytes) throws IOException {
+ input.readFully(bytes);
+ }
+
+ @Override
+ public void readFully(byte[] bytes, int i, int i1) throws IOException {
+ input.readFully(bytes, i, i1);
+ }
+
+ @Override
+ public int read(ByteBuffer byteBuffer) throws IOException {
+ return readDirectBuffer(byteBuffer, tmpBuf, input::read);
+ }
+
+ @Override
+ public void readFully(ByteBuffer byteBuffer) throws IOException {
+ readFullyDirectBuffer(byteBuffer, tmpBuf, input::read);
+ }
+ };
+ }
+
+ private interface ByteBufReader {
+
+ int read(byte[] b, int off, int len) throws IOException;
+ }
+}
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/parquet/ParquetReader.java b/fe/fe-core/src/main/java/org/apache/doris/common/parquet/ParquetReader.java
new file mode 100644
index 0000000..5850af1
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/parquet/ParquetReader.java
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.parquet;
+
+import org.apache.doris.analysis.BrokerDesc;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.RecordReader;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+public class ParquetReader {
+
+ private ParquetFileReader fileReader;
+
+ private ParquetReader(InputFile inputFile) throws IOException {
+ ParquetReadOptions readOptions = ParquetReadOptions.builder().build();
+ this.fileReader = new ParquetFileReader(inputFile, readOptions);
+ }
+
+ public static ParquetReader create(String filePath, BrokerDesc brokerDesc) throws IOException {
+ BrokerInputFile inputFile = BrokerInputFile.create(filePath, brokerDesc);
+ return new ParquetReader(inputFile);
+ }
+
+ public static ParquetReader create(String filePath) throws IOException {
+ LocalInputFile inputFile = new LocalInputFile(new File(filePath));
+ return new ParquetReader(inputFile);
+ }
+
+ // For test only. ip port is broker ip port
+ public static ParquetReader create(String filePath, BrokerDesc brokerDesc, String ip, int port) throws IOException {
+ BrokerInputFile inputFile = BrokerInputFile.create(filePath, brokerDesc, ip, port);
+ return new ParquetReader(inputFile);
+ }
+
+ // Get file schema as a list of column name
+ public List<String> getSchema(boolean debug) {
+ List<String> colNames = Lists.newArrayList();
+ FileMetaData metaData = fileReader.getFileMetaData();
+ MessageType messageType = metaData.getSchema();
+ List<ColumnDescriptor> columnDescriptors = messageType.getColumns();
+ for (ColumnDescriptor column : columnDescriptors) {
+ if (debug) {
+ colNames.add(column.toString());
+ } else {
+ String colName = column.getPath()[0];
+ if (column.getMaxDefinitionLevel() > 1) {
+ // this is a nested column, print then definition level
+ colName += " (" + column.getMaxDefinitionLevel() + ")";
+ }
+ colNames.add(colName);
+ }
+ }
+ return colNames;
+ }
+
+ // get limit number of file content as 2-dimension array
+ public List<List<String>> getLines(int limit) throws IOException {
+ List<List<String>> lines = Lists.newArrayList();
+ FileMetaData metaData = fileReader.getFileMetaData();
+ MessageType schema = metaData.getSchema();
+ final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
+ int readLines = 0;
+ PageReadStore pages = null;
+ while (null != (pages = fileReader.readNextRowGroup()) && readLines < limit) {
+ final long rows = pages.getRowCount();
+ final RecordReader<Group> recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema));
+ for (int i = 0; i < rows && readLines < limit; i++) {
+ List<String> line = Lists.newArrayList();
+ final Group g = recordReader.read();
+ parseGroup(g, line);
+ lines.add(line);
+ readLines++;
+ }
+ }
+ return lines;
+ }
+
+ private void parseGroup(Group g, List<String> line) {
+ int fieldCount = g.getType().getFieldCount();
+ for (int field = 0; field < fieldCount; field++) {
+ int valueCount = g.getFieldRepetitionCount(field);
+ Type fieldType = g.getType().getType(field);
+ String fieldName = fieldType.getName();
+ if (valueCount == 1) {
+ line.add(g.getValueToString(field, 0));
+ } else if (valueCount > 1) {
+ List<String> array = Lists.newArrayList();
+ for (int index = 0; index < valueCount; index++) {
+ if (fieldType.isPrimitive()) {
+ array.add(g.getValueToString(field, index));
+ }
+ }
+ line.add("[" + Joiner.on(",").join(array) + "]");
+ } else {
+ line.add("");
+ }
+ }
+ }
+
+ public void close() throws IOException {
+ fileReader.close();
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerReader.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerReader.java
new file mode 100644
index 0000000..b59e298
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerReader.java
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.UserException;
+import org.apache.doris.service.FrontendOptions;
+import org.apache.doris.thrift.TBrokerCloseReaderRequest;
+import org.apache.doris.thrift.TBrokerFD;
+import org.apache.doris.thrift.TBrokerFileStatus;
+import org.apache.doris.thrift.TBrokerListPathRequest;
+import org.apache.doris.thrift.TBrokerListResponse;
+import org.apache.doris.thrift.TBrokerOpenReaderRequest;
+import org.apache.doris.thrift.TBrokerOpenReaderResponse;
+import org.apache.doris.thrift.TBrokerOperationStatus;
+import org.apache.doris.thrift.TBrokerOperationStatusCode;
+import org.apache.doris.thrift.TBrokerPReadRequest;
+import org.apache.doris.thrift.TBrokerReadResponse;
+import org.apache.doris.thrift.TBrokerVersion;
+import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPaloBrokerService;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+
+public class BrokerReader {
+ private static final Logger LOG = LogManager.getLogger(BrokerReader.class);
+
+ private BrokerDesc brokerDesc;
+ private TNetworkAddress address;
+ private TPaloBrokerService.Client client;
+ private long currentPos;
+
+ public static BrokerReader create(BrokerDesc brokerDesc) throws IOException {
+ try {
+ TNetworkAddress address = BrokerUtil.getAddress(brokerDesc);
+ return new BrokerReader(address, BrokerUtil.borrowClient(address), brokerDesc);
+ } catch (UserException e) {
+ throw new IOException(e);
+ }
+ }
+
+ // For test only
+ public static BrokerReader create(BrokerDesc brokerDesc, String ip, int port) throws IOException {
+ try {
+ TNetworkAddress address = new TNetworkAddress(ip, port);
+ return new BrokerReader(address, BrokerUtil.borrowClient(address), brokerDesc);
+ } catch (UserException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private BrokerReader(TNetworkAddress address, TPaloBrokerService.Client client, BrokerDesc brokerDesc) {
+ this.brokerDesc = brokerDesc;
+ this.address = address;
+ this.client = client;
+ }
+
+ public long getCurrentPos() {
+ return currentPos;
+ }
+
+ public byte[] pread(TBrokerFD fd, long offset, int length) throws IOException, EOFException {
+ TBrokerPReadRequest tPReadRequest = new TBrokerPReadRequest(
+ TBrokerVersion.VERSION_ONE, fd, offset, length);
+ TBrokerReadResponse tReadResponse = null;
+ try {
+ tReadResponse = client.pread(tPReadRequest);
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+ if (tReadResponse.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) {
+ throw new IOException("Broker pread failed. fd=" + fd.toString() + ", broker=" + client
+ + ", msg=" + tReadResponse.getOpStatus().getMessage());
+ }
+
+ if (tReadResponse.getOpStatus().getStatusCode() == TBrokerOperationStatusCode.END_OF_FILE) {
+ throw new EOFException();
+ }
+ return tReadResponse.getData();
+ }
+
+ public TBrokerFD open(String path) throws IOException {
+ String clientId = FrontendOptions.getLocalHostAddress() + ":" + Config.rpc_port;
+ TBrokerOpenReaderRequest tOpenReaderRequest = new TBrokerOpenReaderRequest(
+ TBrokerVersion.VERSION_ONE, path, 0, clientId, brokerDesc.getProperties());
+ TBrokerOpenReaderResponse tOpenReaderResponse = null;
+ try {
+ tOpenReaderResponse = client.openReader(tOpenReaderRequest);
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+ if (tOpenReaderResponse.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) {
+ throw new IOException("Broker open reader failed. path=" + path + ", broker=" + address
+ + ", msg=" + tOpenReaderResponse.getOpStatus().getMessage());
+ }
+ this.currentPos = 0;
+ return tOpenReaderResponse.getFd();
+ }
+
+ public void close(TBrokerFD fd) {
+ TBrokerCloseReaderRequest tCloseReaderRequest = new TBrokerCloseReaderRequest(
+ TBrokerVersion.VERSION_ONE, fd);
+ TBrokerOperationStatus tOperationStatus = null;
+ try {
+ tOperationStatus = client.closeReader(tCloseReaderRequest);
+ } catch (TException e) {
+ LOG.warn("Broker close reader failed. fd={}, address={}", fd.toString(), address, e);
+ }
+ if (tOperationStatus == null || tOperationStatus.getStatusCode() != TBrokerOperationStatusCode.OK) {
+ LOG.warn("Broker close reader failed. fd={}, address={}, error={}", fd.toString(), address,
+ tOperationStatus.getMessage());
+ }
+ }
+
+ public long getFileLength(String path) throws IOException {
+ TBrokerListPathRequest request = new TBrokerListPathRequest(
+ TBrokerVersion.VERSION_ONE, path, false, brokerDesc.getProperties());
+ TBrokerListResponse tBrokerListResponse = null;
+ try {
+ tBrokerListResponse = client.listPath(request);
+ } catch (TException e) {
+ throw new IOException(e);
+ }
+ if (tBrokerListResponse.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) {
+ throw new IOException("Broker list path failed. path=" + path
+ + ",broker=" + address + ",msg=" + tBrokerListResponse.getOpStatus().getMessage());
+ }
+
+ if (tBrokerListResponse.files.size() != 1) {
+ throw new IOException("Match " + tBrokerListResponse.files.size() + " files. Expected: 1");
+ }
+
+ TBrokerFileStatus fileStatus = tBrokerListResponse.files.get(0);
+ if (fileStatus.isDir) {
+ throw new IOException("Meet dir. Expect file");
+ }
+
+ return fileStatus.size;
+ }
+
+ public static class EOFException extends Exception {
+ public EOFException() {
+ super();
+ }
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
index cc73098..5295f96 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
@@ -245,7 +245,7 @@ public class BrokerUtil {
* @return byte[]
* @throws UserException if broker op failed or not only one file
*/
- public static byte[] readFile(String path, BrokerDesc brokerDesc) throws UserException {
+ public static byte[] readFile(String path, BrokerDesc brokerDesc, long maxReadLen) throws UserException {
TNetworkAddress address = getAddress(brokerDesc);
TPaloBrokerService.Client client = borrowClient(address);
boolean failed = true;
@@ -292,8 +292,12 @@ public class BrokerUtil {
fd = tOpenReaderResponse.getFd();
// read
+ long readLen = fileSize;
+ if (maxReadLen > 0 && maxReadLen < fileSize) {
+ readLen = maxReadLen;
+ }
TBrokerPReadRequest tPReadRequest = new TBrokerPReadRequest(
- TBrokerVersion.VERSION_ONE, fd, 0, fileSize);
+ TBrokerVersion.VERSION_ONE, fd, 0, readLen);
TBrokerReadResponse tReadResponse = null;
try {
tReadResponse = client.pread(tPReadRequest);
@@ -506,7 +510,7 @@ public class BrokerUtil {
return new TNetworkAddress(broker.ip, broker.port);
}
- private static TPaloBrokerService.Client borrowClient(TNetworkAddress address) throws UserException {
+ public static TPaloBrokerService.Client borrowClient(TNetworkAddress address) throws UserException {
TPaloBrokerService.Client client = null;
try {
client = ClientPool.brokerPool.borrowObject(address);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/ImportAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/ImportAction.java
new file mode 100644
index 0000000..267ae98
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/ImportAction.java
@@ -0,0 +1,228 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.httpv2.restv2;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.parquet.ParquetReader;
+import org.apache.doris.common.util.BrokerUtil;
+import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
+import org.apache.doris.thrift.TBrokerFileStatus;
+
+import com.google.common.collect.Lists;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import lombok.Getter;
+import lombok.Setter;
+
+@RestController
+@RequestMapping("/rest/v2")
+public class ImportAction {
+
+ private static final Logger LOG = LogManager.getLogger(ImportAction.class);
+
+ private static final long MAX_READ_LEN_BYTES = 1024 * 1024; // 1MB
+
+ private static final String FORMAT_CSV = "CSV";
+ private static final String FORMAT_PARQUET = "PARQUET";
+ private static final String FORMAT_ORC = "ORC";
+
+ private static final int MAX_SAMPLE_LINE = 50;
+
+ /**
+ * Request body:
+ * {
+ * "fileInfo": {
+ * "columnSeparator": ",",
+ * "fileUrl": "hdfs://127.0.0.1:50070/file/test/text*",
+ * "format": "TXT" // TXT or PARQUET
+ * },
+ * "connectInfo": { // Optional
+ * "brokerName" : "my_broker",
+ * "brokerProps" : {
+ * "username" : "yyy",
+ * "password" : "xxx"
+ * }
+ * }
+ * }
+ */
+ @RequestMapping(path = "/api/import/file_review", method = RequestMethod.POST)
+ public Object fileReview(@RequestBody FileReviewRequestVo body, HttpServletRequest request, HttpServletResponse response) {
+ FileInfo fileInfo = body.getFileInfo();
+ ConnectInfo connectInfo = body.getConnectInfo();
+ BrokerDesc brokerDesc = new BrokerDesc(connectInfo.getBrokerName(), connectInfo.getBrokerProps());
+
+ List<TBrokerFileStatus> fileStatuses = Lists.newArrayList();
+ try {
+ // get file status
+ BrokerUtil.parseFile(fileInfo.getFileUrl(), brokerDesc, fileStatuses);
+ // create response
+ FileReviewResponseVo reviewResponseVo = createFileReviewResponse(brokerDesc, fileInfo, fileStatuses);
+ return ResponseEntityBuilder.ok(reviewResponseVo);
+ } catch (UserException e) {
+ return ResponseEntityBuilder.okWithCommonError(e.getMessage());
+ }
+ }
+
+ private FileReviewResponseVo createFileReviewResponse(BrokerDesc brokerDesc, FileInfo fileInfo,
+ List<TBrokerFileStatus> fileStatuses) throws UserException {
+ FileReviewResponseVo responseVo = new FileReviewResponseVo();
+ // set file review statistic
+ FileReviewStatistic statistic = new FileReviewStatistic();
+ statistic.setFileNumber(fileStatuses.size());
+ long totalFileSize = 0;
+ for (TBrokerFileStatus fStatus : fileStatuses) {
+ if (fStatus.isDir) {
+ throw new UserException("Not all matched paths are files: " + fStatus.path);
+ }
+ totalFileSize += fStatus.size;
+ }
+ statistic.setFileSize(totalFileSize);
+ responseVo.setReviewStatistic(statistic);
+
+ if (fileStatuses.isEmpty()) {
+ return responseVo;
+ }
+
+ // Begin to preview first file.
+ TBrokerFileStatus sampleFile = fileStatuses.get(0);
+ FileSample fileSample = new FileSample();
+ fileSample.setSampleFileName(sampleFile.path);
+
+ if (fileInfo.format.equalsIgnoreCase(FORMAT_CSV)) {
+ byte[] fileContentBytes = BrokerUtil.readFile(sampleFile.path, brokerDesc, MAX_READ_LEN_BYTES);
+ parseContent(fileInfo.columnSeparator, "\n", fileContentBytes, fileSample);
+ } else if (fileInfo.format.equalsIgnoreCase(FORMAT_PARQUET)) {
+ try {
+ ParquetReader parquetReader = ParquetReader.create(sampleFile.path, brokerDesc);
+ parseParquet(parquetReader, fileSample);
+ } catch (IOException e) {
+ LOG.warn("failed to get sample data of parquet file: {}", sampleFile.path, e);
+ throw new UserException("failed to get sample data of parquet file. " + e.getMessage());
+ }
+ } else {
+ throw new UserException("Only support CSV or PARQUET file format");
+ }
+
+ responseVo.setFileSample(fileSample);
+ return responseVo;
+ }
+
+ private void parseContent(String columnSeparator, String lineDelimiter, byte[] fileContentBytes,
+ FileSample fileSample) {
+ List<List<String>> sampleLines = Lists.newArrayList();
+ int maxColSize = 0;
+ String content = new String(fileContentBytes);
+ String[] lines = content.split(lineDelimiter);
+ for (String line : lines) {
+ if (sampleLines.size() >= MAX_SAMPLE_LINE) {
+ break;
+ }
+ String[] cols = line.split(columnSeparator);
+ List<String> row = Lists.newArrayList(cols);
+ sampleLines.add(row);
+ maxColSize = Math.max(maxColSize, row.size());
+ }
+
+ fileSample.setFileLineNumber(sampleLines.size());
+ fileSample.setMaxColumnSize(maxColSize);
+ fileSample.setSampleFileLines(sampleLines);
+ return;
+ }
+
+ private void parseParquet(ParquetReader reader, FileSample fileSample) throws IOException {
+ fileSample.setColNames(reader.getSchema(false));
+ fileSample.setMaxColumnSize(fileSample.colNames.size());
+ fileSample.setSampleFileLines(reader.getLines(MAX_SAMPLE_LINE));
+ fileSample.setFileLineNumber(fileSample.sampleFileLines.size());
+ }
+
+ @Getter
+ @Setter
+ public static class FileReviewRequestVo {
+ private FileInfo fileInfo;
+ private ConnectInfo connectInfo;
+ }
+
+ @Getter
+ @Setter
+ public static class FileInfo {
+ private String columnSeparator;
+ private String fileUrl;
+ private String format;
+ }
+
+ @Getter
+ @Setter
+ public static class ConnectInfo {
+ private String brokerName;
+ private Map<String, String> brokerProps;
+ }
+
+ @Getter
+ @Setter
+ public static class FileReviewResponseVo {
+ private FileReviewStatistic reviewStatistic;
+ private FileSample fileSample;
+ }
+
+ @Getter
+ @Setter
+ public static class FileReviewStatistic {
+ private int fileNumber;
+ private long fileSize;
+ }
+
+ @Getter
+ @Setter
+ public static class FileSample {
+ private String sampleFileName;
+ private int fileLineNumber;
+ private int maxColumnSize;
+ private List<String> colNames;
+ private List<List<String>> sampleFileLines;
+ }
+
+
+ public static void main(String[] args) {
+ ImportAction importAction = new ImportAction();
+ String str = "1,2,3\n4,5,6\n,7,8,9,中国";
+ byte[] fileContentBytes = str.getBytes();
+ System.out.println(fileContentBytes.length);
+ String newStr = new String(fileContentBytes, 0,fileContentBytes.length - 2);
+ System.out.println(newStr);
+
+ FileSample fileSample = new FileSample();
+ importAction.parseContent(",", "\n", newStr.getBytes(), fileSample);
+ System.out.println(fileSample.sampleFileLines);
+ }
+
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/MetaInfoActionV2.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/MetaInfoActionV2.java
new file mode 100644
index 0000000..c3f64e2
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/MetaInfoActionV2.java
@@ -0,0 +1,392 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.httpv2.restv2;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.cluster.ClusterNamespace;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.UserException;
+import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
+import org.apache.doris.httpv2.exception.BadRequestException;
+import org.apache.doris.httpv2.rest.RestBaseController;
+import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.system.SystemInfoService;
+
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * And meta info like databases, tables and schema
+ */
+@RestController
+@RequestMapping("/rest/v2")
+public class MetaInfoActionV2 extends RestBaseController {
+
+ private static final String NAMESPACES = "namespaces";
+ private static final String DATABASES = "databases";
+ private static final String TABLES = "tables";
+ private static final String PARAM_LIMIT = "limit";
+ private static final String PARAM_OFFSET = "offset";
+ private static final String PARAM_WITH_MV = "with_mv";
+
+ /**
+ * Get all databases
+ * {
+ * "msg": "success",
+ * "code": 0,
+ * "data": [
+ * "default_cluster:db1",
+ * "default_cluster:doris_audit_db__",
+ * "default_cluster:information_schema"
+ * ],
+ * "count": 0
+ * }
+ */
+ @RequestMapping(path = "/api/meta/" + NAMESPACES + "/{" + NS_KEY + "}/" + DATABASES,
+ method = {RequestMethod.GET})
+ public Object getAllDatabases(
+ @PathVariable(value = NS_KEY) String ns,
+ HttpServletRequest request, HttpServletResponse response) {
+ checkWithCookie(request, response, false);
+
+ if (!ns.equalsIgnoreCase(SystemInfoService.DEFAULT_CLUSTER)) {
+ return ResponseEntityBuilder.badRequest("Only support 'default_cluster' now");
+ }
+
+ // 1. get all database with privilege
+ List<String> dbNames = null;
+ try {
+ dbNames = Catalog.getCurrentCatalog().getClusterDbNames(ns);
+ } catch (AnalysisException e) {
+ return ResponseEntityBuilder.okWithCommonError("namespace does not exist: " + ns);
+ }
+ List<String> dbNameSet = Lists.newArrayList();
+ for (String fullName : dbNames) {
+ final String db = ClusterNamespace.getNameFromFullName(fullName);
+ if (!Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), fullName,
+ PrivPredicate.SHOW)) {
+ continue;
+ }
+ dbNameSet.add(db);
+ }
+
+ Collections.sort(dbNames);
+
+ // handle limit offset
+ Pair<Integer, Integer> fromToIndex = getFromToIndex(request, dbNames.size());
+ return ResponseEntityBuilder.ok(dbNames.subList(fromToIndex.first, fromToIndex.second));
+ }
+
+ /** Get all tables of a database
+ * {
+ * "msg": "success",
+ * "code": 0,
+ * "data": [
+ * "tbl1",
+ * "tbl2"
+ * ],
+ * "count": 0
+ * }
+ */
+
+ @RequestMapping(path = "/api/meta/" + NAMESPACES + "/{" + NS_KEY + "}/" + DATABASES + "/{" + DB_KEY + "}/" + TABLES,
+ method = {RequestMethod.GET})
+ public Object getTables(
+ @PathVariable(value = NS_KEY) String ns, @PathVariable(value = DB_KEY) String dbName,
+ HttpServletRequest request, HttpServletResponse response) {
+ checkWithCookie(request, response, false);
+
+ if (!ns.equalsIgnoreCase(SystemInfoService.DEFAULT_CLUSTER)) {
+ return ResponseEntityBuilder.badRequest("Only support 'default_cluster' now");
+ }
+
+
+ String fullDbName = getFullDbName(dbName);
+ Database db = Catalog.getCurrentCatalog().getDb(fullDbName);
+ if (db == null) {
+ return ResponseEntityBuilder.okWithCommonError("Database does not exist: " + fullDbName);
+ }
+
+ List<String> tblNames = Lists.newArrayList();
+ db.readLock();
+ try {
+ for (Table tbl : db.getTables()) {
+ if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), fullDbName, tbl.getName(),
+ PrivPredicate.SHOW)) {
+ continue;
+ }
+ tblNames.add(tbl.getName());
+ }
+ } finally {
+ db.readUnlock();
+ }
+
+ Collections.sort(tblNames);
+
+ // handle limit offset
+ Pair<Integer, Integer> fromToIndex = getFromToIndex(request, tblNames.size());
+ return ResponseEntityBuilder.ok(tblNames.subList(fromToIndex.first, fromToIndex.second));
+ }
+
+ /**
+ * {
+ * "msg": "success",
+ * "code": 0,
+ * "data": {
+ * "engineType": "OLAP",
+ * "schemaInfo": {
+ * "schemaMap": {
+ * "tbl1": {
+ * "schema": {
+ * "field": "k2",
+ * "type": "INT",
+ * "isNull": "true",
+ * "defaultVal": null,
+ * "key": "true",
+ * "aggrType": "None",
+ * "comment": ""
+ * },
+ * "keyType": "DUP_KEYS",
+ * "baseIndex": true
+ * }
+ * }
+ * }
+ * },
+ * "count": 0
+ * }
+ */
+ @RequestMapping(path = "/api/meta/" + NAMESPACES + "/{" + NS_KEY + "}/" + DATABASES + "/{" + DB_KEY + "}/" + TABLES
+ + "/{" + TABLE_KEY + "}/schema",
+ method = {RequestMethod.GET})
+ public Object getTableSchema(
+ @PathVariable(value = NS_KEY) String ns, @PathVariable(value = DB_KEY) String dbName,
+ @PathVariable(value = TABLE_KEY) String tblName,
+ HttpServletRequest request, HttpServletResponse response) throws UserException {
+ checkWithCookie(request, response, false);
+
+ if (!ns.equalsIgnoreCase(SystemInfoService.DEFAULT_CLUSTER)) {
+ return ResponseEntityBuilder.badRequest("Only support 'default_cluster' now");
+ }
+
+ String fullDbName = getFullDbName(dbName);
+ checkTblAuth(ConnectContext.get().getCurrentUserIdentity(), fullDbName, tblName, PrivPredicate.SHOW);
+
+ Database db = Catalog.getCurrentCatalog().getDb(fullDbName);
+ if (db == null) {
+ return ResponseEntityBuilder.okWithCommonError("Database does not exist: " + fullDbName);
+ }
+
+ String withMvPara = request.getParameter(PARAM_WITH_MV);
+ boolean withMv = Strings.isNullOrEmpty(withMvPara) ? false : withMvPara.equals("1");
+
+
+ TableSchemaInfo tableSchemaInfo = new TableSchemaInfo();
+ db.readLock();
+ try {
+ Table tbl = db.getTable(tblName);
+ if (tbl == null) {
+ return ResponseEntityBuilder.okWithCommonError("Table does not exist: " + tblName);
+ }
+
+ tableSchemaInfo.setEngineType(tbl.getType().toString());
+ SchemaInfo schemaInfo = generateSchemaInfo(tbl, withMv);
+ tableSchemaInfo.setSchemaInfo(schemaInfo);
+ return ResponseEntityBuilder.ok(tableSchemaInfo);
+ } finally {
+ db.readUnlock();
+ }
+ }
+
+ private SchemaInfo generateSchemaInfo(Table tbl, boolean withMv) {
+ SchemaInfo schemaInfo = new SchemaInfo();
+ Map<String, TableSchema> schemaMap = Maps.newHashMap();
+ if (tbl.getType() == Table.TableType.OLAP) {
+ OlapTable olapTable = (OlapTable) tbl;
+ long baseIndexId = olapTable.getBaseIndexId();
+ TableSchema baseTableSchema = new TableSchema();
+ baseTableSchema.setBaseIndex(true);
+ baseTableSchema.setKeyType(olapTable.getKeysTypeByIndexId(baseIndexId).name());
+ Schema baseSchema = generateSchame(olapTable.getSchemaByIndexId(baseIndexId));
+ baseTableSchema.setSchema(baseSchema);
+ schemaMap.put(olapTable.getIndexNameById(baseIndexId), baseTableSchema);
+
+ if (withMv) {
+ for (long indexId : olapTable.getIndexIdListExceptBaseIndex()) {
+ TableSchema tableSchema = new TableSchema();
+ tableSchema.setBaseIndex(false);
+ tableSchema.setKeyType(olapTable.getKeysTypeByIndexId(indexId).name());
+ Schema schema = generateSchame(olapTable.getSchemaByIndexId(indexId));
+ tableSchema.setSchema(schema);
+ schemaMap.put(olapTable.getIndexNameById(indexId), tableSchema);
+ }
+ }
+
+ } else {
+ TableSchema tableSchema = new TableSchema();
+ tableSchema.setBaseIndex(false);
+ Schema schema = generateSchame(tbl.getBaseSchema());
+ tableSchema.setSchema(schema);
+ schemaMap.put(tbl.getName(), tableSchema);
+ schemaInfo.setSchemaMap(schemaMap);
+ }
+ schemaInfo.setSchemaMap(schemaMap);
+ return schemaInfo;
+ }
+
+ private Schema generateSchame(List<Column> columns) {
+ Schema schema = new Schema();
+ for (Column column : columns) {
+ schema.setField(column.getName());
+ schema.setType(column.getType().toString());
+ schema.setIsNull(String.valueOf(column.isAllowNull()));
+ schema.setDefaultVal(column.getDefaultValue());
+ schema.setKey(String.valueOf(column.isKey()));
+ schema.setAggrType(column.getAggregationType() == null ?
+ "None" : column.getAggregationType().toString());
+ schema.setComment(column.getComment());
+ }
+ return schema;
+ }
+
+ private void generateResult(Table tbl, boolean isBaseIndex,
+ Map<String, Map<String, Object>> result) throws UserException {
+ Map<String, Object> propMap = result.get(tbl.getName());
+ if (propMap == null) {
+ propMap = Maps.newHashMap();
+ result.put(tbl.getName(), propMap);
+ }
+
+ propMap.put("isBase", isBaseIndex);
+ propMap.put("tableType", tbl.getEngine());
+ if (tbl.getType() == Table.TableType.OLAP) {
+ propMap.put("keyType", ((OlapTable)tbl).getKeysType());
+ }
+ propMap.put("schema", generateSchema(tbl.getBaseSchema()));
+ }
+
+ List<Map<String, String>> generateSchema(List<Column> columns) throws UserException {
+ List<Map<String, String>> schema = Lists.newArrayList();
+ for (Column column : columns) {
+ Map<String, String> colSchema = Maps.newHashMap();
+ colSchema.put("Field", column.getName());
+ colSchema.put("Type", column.getType().toString());
+ colSchema.put("Null", String.valueOf(column.isAllowNull()));
+ colSchema.put("Default", column.getDefaultValue());
+ colSchema.put("Key", String.valueOf(column.isKey()));
+ colSchema.put("AggType", column.getAggregationType().toString());
+ colSchema.put("Comment", column.getComment());
+ schema.add(colSchema);
+ }
+ return schema;
+ }
+
+ private String convertIfNull(String val) {
+ return val.equals(FeConstants.null_string) ? null : val;
+ }
+
+ // get limit and offset from query parameter
+ // and return fromIndex and toIndex of a list
+ private Pair<Integer, Integer> getFromToIndex(HttpServletRequest request, int maxNum) {
+ String limitStr = request.getParameter(PARAM_LIMIT);
+ String offsetStr = request.getParameter(PARAM_OFFSET);
+
+ int offset = 0;
+ int limit = Integer.MAX_VALUE;
+ if (Strings.isNullOrEmpty(limitStr)) {
+ // limit not set
+ if (!Strings.isNullOrEmpty(offsetStr)) {
+ throw new BadRequestException("Param offset should be set with param limit");
+ }
+ } else {
+ // limit is set
+ limit = Integer.valueOf(limitStr);
+ if (limit < 0) {
+ throw new BadRequestException("Param limit should >= 0");
+ }
+
+ offset = 0;
+ if (!Strings.isNullOrEmpty(offsetStr)) {
+ offset = Integer.valueOf(offsetStr);
+ if (offset < 0) {
+ throw new BadRequestException("Param offset should >= 0");
+ }
+ }
+ }
+
+ if (maxNum <= 0) {
+ return Pair.create(0, 0);
+ }
+ return Pair.create(Math.min(offset, maxNum - 1), Math.min(limit + offset, maxNum));
+ }
+
+ @Getter
+ @Setter
+ public static class TableSchemaInfo {
+ private String engineType;
+ private SchemaInfo schemaInfo;
+ }
+
+ @Getter
+ @Setter
+ public static class SchemaInfo {
+ // tbl(index name) -> schema
+ private Map<String, TableSchema> schemaMap;
+ }
+
+ @Getter
+ @Setter
+ public static class TableSchema {
+ private Schema schema;
+ private boolean isBaseIndex;
+ private String keyType;
+ }
+
+ @Getter
+ @Setter
+ public static class Schema {
+ private String field;
+ private String type;
+ private String isNull;
+ private String defaultVal;
+ private String key;
+ private String aggrType;
+ private String comment;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/StatisticAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/StatisticAction.java
new file mode 100644
index 0000000..6fff16f
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/restv2/StatisticAction.java
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.httpv2.restv2;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
+import org.apache.doris.httpv2.rest.RestBaseController;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+import com.google.common.collect.Maps;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.util.List;
+import java.util.Map;
+
+@RestController
+@RequestMapping("/rest/v2")
+public class StatisticAction extends RestBaseController {
+
+ private static final Logger LOG = LogManager.getLogger(StatisticAction.class);
+
+ @RequestMapping(path = "/api/cluster_overview", method = RequestMethod.GET)
+ public Object clusterOverview(HttpServletRequest request, HttpServletResponse response) {
+ Map<String, Object> resultMap = Maps.newHashMap();
+ Catalog catalog = Catalog.getCurrentCatalog();
+ SystemInfoService infoService = Catalog.getCurrentSystemInfo();
+
+ resultMap.put("dbCount", catalog.getDbIds().size());
+ resultMap.put("tblCount", getTblCount(catalog));
+ resultMap.put("diskOccupancy", getDiskOccupancy(infoService));
+ resultMap.put("beCount", infoService.getClusterBackendIds(SystemInfoService.DEFAULT_CLUSTER).size());
+ resultMap.put("feCount", catalog.getFrontends(null).size());
+ resultMap.put("remainDisk", getRemainDisk(infoService));
+
+ return ResponseEntityBuilder.ok(resultMap);
+ }
+
+ private int getTblCount(Catalog catalog) {
+ int tblCount = 0;
+ List<Long> dbIds = catalog.getDbIds();
+ for (long dbId : dbIds) {
+ Database db = catalog.getDb(dbId);
+ tblCount += db.getTables().size();
+ }
+ return tblCount;
+ }
+
+ private long getDiskOccupancy(SystemInfoService infoService) {
+ long diskOccupancy = 0;
+ List<Backend> backends = infoService.getClusterBackends(SystemInfoService.DEFAULT_CLUSTER);
+ for (Backend be : backends) {
+ diskOccupancy += be.getDataUsedCapacityB();
+ }
+ return diskOccupancy;
+ }
+
+ private long getRemainDisk(SystemInfoService infoService) {
+ long remainDisk = 0;
+ List<Backend> backends = infoService.getClusterBackends(SystemInfoService.DEFAULT_CLUSTER);
+ for (Backend be : backends) {
+ remainDisk += be.getAvailableCapacityB();
+ }
+ return remainDisk;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
index 21f6431..f26793a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
@@ -33,12 +33,6 @@ import org.apache.doris.load.loadv2.etl.EtlJobConfig;
import org.apache.doris.load.loadv2.etl.SparkEtlJob;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TEtlState;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.gson.Gson;
-import com.google.gson.JsonSyntaxException;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -47,6 +41,13 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.launcher.SparkLauncher;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+
import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
@@ -245,7 +246,7 @@ public class SparkEtlJobHandler {
// get dpp result
String dppResultFilePath = EtlJobConfig.getDppResultFilePath(etlOutputPath);
try {
- byte[] data = BrokerUtil.readFile(dppResultFilePath, brokerDesc);
+ byte[] data = BrokerUtil.readFile(dppResultFilePath, brokerDesc, 0);
String dppResultStr = new String(data, "UTF-8");
DppResult dppResult = new Gson().fromJson(dppResultStr, DppResult.class);
if (dppResult != null) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/parquet/ParquetReaderTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/parquet/ParquetReaderTest.java
new file mode 100644
index 0000000..6c0896c
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/parquet/ParquetReaderTest.java
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.parquet;
+
+import org.apache.doris.analysis.BrokerDesc;
+
+import com.google.common.collect.Maps;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class ParquetReaderTest {
+ private static final Logger LOG = LogManager.getLogger(ParquetReaderTest.class);
+
+ // you need to set
+ // localfile, remote file
+ // ak, sk, broker desc
+ // before running this test
+ @Ignore
+ @Test
+ public void testWrongFormat() {
+ try {
+ // local
+ String file = "/path/to/local/all_type_none.parquet";
+ ParquetReader reader = ParquetReader.create(file);
+ LOG.info(reader.getSchema(false));
+
+ // remote
+ String file2 = "bos://cmy-repe/all_type_none.parquet";
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("bos_endpoint", "http://bj.bcebos.com");
+ properties.put("bos_accesskey", "1");
+ properties.put("bos_secret_accesskey", "2");
+ BrokerDesc brokerDesc = new BrokerDesc("dummy", properties);
+
+ ParquetReader reader2 = ParquetReader.create(file2, brokerDesc,"127.0.0.1", 8118);
+ LOG.info(reader2.getSchema(false));
+ } catch (Exception e) {
+ LOG.info("error: ", e);
+ }
+ }
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java
index 4be5d0f..af7d327 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/BrokerUtilTest.java
@@ -17,9 +17,6 @@
package org.apache.doris.common.util;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.catalog.BrokerMgr;
import org.apache.doris.catalog.Catalog;
@@ -48,11 +45,7 @@ import org.apache.doris.thrift.TPaloBrokerService;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import mockit.Expectations;
-import mockit.Injectable;
-import mockit.Mock;
-import mockit.Mocked;
-import mockit.MockUp;
+
import org.apache.thrift.TException;
import org.junit.Assert;
import org.junit.Test;
@@ -61,6 +54,14 @@ import java.io.UnsupportedEncodingException;
import java.util.Collections;
import java.util.List;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mock;
+import mockit.MockUp;
+import mockit.Mocked;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
public class BrokerUtilTest {
@Test
@@ -218,7 +219,7 @@ public class BrokerUtilTest {
};
BrokerDesc brokerDesc = new BrokerDesc("broker0", Maps.newHashMap());
- byte[] data = BrokerUtil.readFile(filePath, brokerDesc);
+ byte[] data = BrokerUtil.readFile(filePath, brokerDesc, 0);
String readStr = new String(data, "UTF-8");
Assert.assertEquals(dppResultStr, readStr);
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
index 73df628..a462630 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkEtlJobHandlerTest.java
@@ -46,14 +46,15 @@ import org.apache.doris.thrift.TBrokerOperationStatusCode;
import org.apache.doris.thrift.TEtlState;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloBrokerService;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import org.apache.spark.launcher.SparkLauncher;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -233,7 +234,7 @@ public class SparkEtlJobHandlerTest {
minTimes = 0;
result = commandResult;
- BrokerUtil.readFile(anyString, (BrokerDesc) any);
+ BrokerUtil.readFile(anyString, (BrokerDesc) any, anyLong);
result = "{'normal_rows': 10, 'abnormal_rows': 0, 'failed_reason': 'etl job failed'}";
}
};
diff --git a/fe/pom.xml b/fe/pom.xml
index 0665f65..c0c3f50 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -672,6 +672,14 @@ under the License.
<version>1.2</version>
</dependency>
+ <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <version>1.18.16</version>
+ <scope>provided</scope>
+ </dependency>
+
</dependencies>
</dependencyManagement>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org