You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2020/06/17 14:04:11 UTC

[GitHub] [incubator-doris] morningman commented on a change in pull request #3716: [Spark load][Fe 5/6] Fe submit spark etl job

morningman commented on a change in pull request #3716:
URL: https://github.com/apache/incubator-doris/pull/3716#discussion_r437874451



##########
File path: fe/src/main/java/org/apache/doris/common/Pair.java
##########
@@ -25,7 +27,9 @@
 public class Pair<F, S> {
     public static PairComparator<Pair<?, Comparable>> PAIR_VALUE_COMPARATOR = new PairComparator<>();
 
+    @SerializedName(value = "first")

Review comment:
       I checked, this is not work

##########
File path: fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
##########
@@ -127,6 +140,76 @@ private void setResourceInfo() throws DdlException {
         brokerDesc = new BrokerDesc(sparkResource.getBroker(), brokerProperties);
     }
 
+    @Override
+    public void beginTxn()
+            throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, DuplicatedRequestException {
+       transactionId = Catalog.getCurrentGlobalTransactionMgr()
+                .beginTransaction(dbId, Lists.newArrayList(fileGroupAggInfo.getAllTableIds()), label, null,
+                                  new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
+                                  LoadJobSourceType.FRONTEND, id, timeoutSecond);
+    }
+
+    @Override
+    protected void unprotectedExecuteJob() throws LoadException {
+        // create pending task
+        LoadTask task = new SparkLoadPendingTask(this, fileGroupAggInfo.getAggKeyToFileGroups(),
+                                                 sparkResource, brokerDesc);
+        task.init();
+        idToTasks.put(task.getSignature(), task);
+        Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(task);
+    }
+
+    @Override
+    public void onTaskFinished(TaskAttachment attachment) {
+        if (attachment instanceof SparkPendingTaskAttachment) {
+            onPendingTaskFinished((SparkPendingTaskAttachment) attachment);
+        }
+    }
+
+    private void onPendingTaskFinished(SparkPendingTaskAttachment attachment) {
+        writeLock();
+        try {
+            // check if job has been cancelled
+            if (isTxnDone()) {
+                LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
+                                 .add("state", state)
+                                 .add("error_msg", "this task will be ignored when job is: " + state)
+                                 .build());
+                return;
+            }
+
+            if (finishedTaskIds.contains(attachment.getTaskId())) {
+                LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
+                                 .add("task_id", attachment.getTaskId())
+                                 .add("error_msg", "this is a duplicated callback of pending task "
+                                         + "when broker already has loading task")
+                                 .build());
+                return;
+            }
+
+            // add task id into finishedTaskIds
+            finishedTaskIds.add(attachment.getTaskId());
+
+            sparkAppHandle = attachment.getHandle();
+            appId = attachment.getAppId();
+            etlOutputPath = attachment.getOutputPath();
+
+            executeEtl();
+            // log etl state
+            unprotectedLogUpdateStateInfo();
+        } finally {
+            writeUnlock();
+        }
+    }
+
+    /**
+     * update etl start time and state in spark load job
+     */
+    private void executeEtl() {
+        etlStartTimestamp = System.currentTimeMillis();
+        state = JobState.ETL;

Review comment:
       Print a log for tracing the state changing

##########
File path: fe/src/main/java/org/apache/doris/common/util/BrokerUtil.java
##########
@@ -139,4 +148,352 @@ public static String printBroker(String brokerName, TNetworkAddress address) {
         return Lists.newArrayList(columns);
     }
 
+    /**
+     * Read binary data from path with broker
+     * @param path
+     * @param brokerDesc
+     * @return byte[]
+     * @throws UserException if broker op failed or not only one file
+     */
+    public static byte[] readFile(String path, BrokerDesc brokerDesc) throws UserException {
+        TNetworkAddress address = getAddress(brokerDesc);
+        TPaloBrokerService.Client client = borrowClient(address);
+        boolean failed = true;
+        TBrokerFD fd = null;
+        try {
+            // get file size
+            TBrokerListPathRequest request = new TBrokerListPathRequest(
+                    TBrokerVersion.VERSION_ONE, path, false, brokerDesc.getProperties());
+            TBrokerListResponse tBrokerListResponse = null;
+            try {
+                tBrokerListResponse = client.listPath(request);
+            } catch (TException e) {
+                reopenClient(client);
+                tBrokerListResponse = client.listPath(request);
+            }
+            if (tBrokerListResponse.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) {
+                throw new UserException("Broker list path failed. path=" + path + ", broker=" + address
+                                                + ",msg=" + tBrokerListResponse.getOpStatus().getMessage());
+            }
+            List<TBrokerFileStatus> fileStatuses = tBrokerListResponse.getFiles();
+            if (fileStatuses.size() != 1) {
+                throw new UserException("Broker files num error. path=" + path + ", broker=" + address
+                                                + ", files num: " + fileStatuses.size());
+            }
+
+            Preconditions.checkState(!fileStatuses.get(0).isIsDir());
+            long fileSize = fileStatuses.get(0).getSize();
+
+            // open reader
+            String clientId = FrontendOptions.getLocalHostAddress() + ":" + Config.rpc_port;
+            TBrokerOpenReaderRequest tOpenReaderRequest = new TBrokerOpenReaderRequest(
+                    TBrokerVersion.VERSION_ONE, path, 0, clientId, brokerDesc.getProperties());
+            TBrokerOpenReaderResponse tOpenReaderResponse = null;
+            try {
+                tOpenReaderResponse = client.openReader(tOpenReaderRequest);
+            } catch (TException e) {
+                reopenClient(client);
+                tOpenReaderResponse = client.openReader(tOpenReaderRequest);
+            }
+            if (tOpenReaderResponse.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) {
+                throw new UserException("Broker open reader failed. path=" + path + ", broker=" + address
+                                                + ", msg=" + tOpenReaderResponse.getOpStatus().getMessage());
+            }
+            fd = tOpenReaderResponse.getFd();
+
+            // read
+            TBrokerPReadRequest tPReadRequest = new TBrokerPReadRequest(
+                    TBrokerVersion.VERSION_ONE, fd, 0, fileSize);
+            TBrokerReadResponse tReadResponse = null;
+            try {
+                tReadResponse = client.pread(tPReadRequest);
+            } catch (TException e) {
+                reopenClient(client);
+                tReadResponse = client.pread(tPReadRequest);
+            }
+            if (tReadResponse.getOpStatus().getStatusCode() != TBrokerOperationStatusCode.OK) {
+                throw new UserException("Broker read failed. path=" + path + ", broker=" + address
+                                                + ", msg=" + tReadResponse.getOpStatus().getMessage());
+            }
+            failed = false;
+            return tReadResponse.getData();

Review comment:
       broker's pread() method does not guarantee to read the specified length of data currently.
   But #3881 is trying to solve this problem. Just for remind.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org