You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2020/06/18 06:08:05 UTC

[GitHub] [incubator-doris] wyb commented on a change in pull request #3716: [Spark load][Fe 5/6] Fe submit spark etl job

wyb commented on a change in pull request #3716:
URL: https://github.com/apache/incubator-doris/pull/3716#discussion_r441988075



##########
File path: fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java
##########
@@ -139,4 +148,352 @@ public static String printBroker(String brokerName, TNetworkAddress address) {
         return Lists.newArrayList(columns);
     }
 
+    /**
+     * Read binary data from path with broker
+     * @param path
+     * @param brokerDesc
+     * @return byte[]
+     * @throws UserException if broker op failed or not only one file
+     */
+    public static byte[] readFile(String path, BrokerDesc brokerDesc) throws UserException {
+        TNetworkAddress address = getAddress(brokerDesc);
+        TPaloBrokerService.Client client = borrowClient(address);
+        boolean failed = true;
+        TBrokerFD fd = null;
+        try {
+            // get file size
+            TBrokerListPathRequest request = new TBrokerListPathRequest(
+                    TBrokerVersion.VERSION_ONE, path, false, brokerDesc.getProperties());
+            TBrokerListResponse tBrokerListResponse = null;
+            try {
+                tBrokerListResponse = client.listPath(request);
+            } catch (TException e) {
+                reopenClient(client);
+                tBrokerListResponse = client.listPath(request);
+            }
+            if (tBrokerListResponse.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) {
+                throw new UserException("Broker list path failed. path=" + path + ", broker=" + address
+                                                + ",msg=" + tBrokerListResponse.getOpStatus().getMessage());
+            }
+            List<TBrokerFileStatus> fileStatuses = tBrokerListResponse.getFiles();
+            if (fileStatuses.size() != 1) {
+                throw new UserException("Broker files num error. path=" + path + ", broker=" + address
+                                                + ", files num: " + fileStatuses.size());
+            }
+
+            Preconditions.checkState(!fileStatuses.get(0).isIsDir());
+            long fileSize = fileStatuses.get(0).getSize();
+
+            // open reader
+            String clientId = FrontendOptions.getLocalHostAddress() + ":" + Config.rpc_port;
+            TBrokerOpenReaderRequest tOpenReaderRequest = new TBrokerOpenReaderRequest(
+                    TBrokerVersion.VERSION_ONE, path, 0, clientId, brokerDesc.getProperties());
+            TBrokerOpenReaderResponse tOpenReaderResponse = null;
+            try {
+                tOpenReaderResponse = client.openReader(tOpenReaderRequest);
+            } catch (TException e) {
+                reopenClient(client);
+                tOpenReaderResponse = client.openReader(tOpenReaderRequest);
+            }
+            if (tOpenReaderResponse.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) {
+                throw new UserException("Broker open reader failed. path=" + path + ", broker=" + address
+                                                + ", msg=" + tOpenReaderResponse.getOpStatus().getMessage());
+            }
+            fd = tOpenReaderResponse.getFd();
+
+            // read
+            TBrokerPReadRequest tPReadRequest = new TBrokerPReadRequest(
+                    TBrokerVersion.VERSION_ONE, fd, 0, fileSize);
+            TBrokerReadResponse tReadResponse = null;
+            try {
+                tReadResponse = client.pread(tPReadRequest);
+            } catch (TException e) {
+                reopenClient(client);
+                tReadResponse = client.pread(tPReadRequest);
+            }
+            if (tReadResponse.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) {
+                throw new UserException("Broker read failed. path=" + path + ", broker=" + address
+                                                + ", msg=" + tReadResponse.getOpStatus().getMessage());
+            }
+            failed = false;
+            return tReadResponse.getData();

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org