You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by je...@apache.org on 2018/11/26 18:44:32 UTC

[incubator-pinot] 01/01: Adding pluggable storage support for realtime upload

This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch rt
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 830e4437e3ee8080af3e9304bdd235f9ceeba30e
Author: Jennifer Dai <jd...@linkedin.com>
AuthorDate: Mon Nov 26 10:38:11 2018 -0800

    Adding pluggable storage support for realtime upload
---
 .../resources/LLCSegmentCompletionHandlers.java    | 58 ++++++++++++++++------
 1 file changed, 43 insertions(+), 15 deletions(-)

diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
index 38c0dd7..6832b85 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
@@ -18,9 +18,13 @@ package com.linkedin.pinot.controller.api.resources;
 import com.google.common.annotations.VisibleForTesting;
 import com.linkedin.pinot.common.protocols.SegmentCompletionProtocol;
 import com.linkedin.pinot.common.utils.LLCSegmentName;
+import com.linkedin.pinot.common.utils.StringUtil;
 import com.linkedin.pinot.controller.ControllerConf;
 import com.linkedin.pinot.controller.helix.core.realtime.SegmentCompletionManager;
 import com.linkedin.pinot.controller.util.SegmentCompletionUtils;
+import com.linkedin.pinot.filesystem.LocalPinotFS;
+import com.linkedin.pinot.filesystem.PinotFS;
+import com.linkedin.pinot.filesystem.PinotFSFactory;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -39,7 +43,6 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import org.apache.commons.httpclient.URI;
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.glassfish.jersey.media.multipart.FormDataBodyPart;
 import org.glassfish.jersey.media.multipart.FormDataMultiPart;
@@ -300,27 +303,48 @@ public class LLCSegmentCompletionHandlers {
       FormDataBodyPart bodyPart = map.get(name).get(0);
 
       FileUploadPathProvider provider = new FileUploadPathProvider(_controllerConf);
-      File tmpFile = new File(provider.getFileUploadTmpDir(), name + "." + UUID.randomUUID().toString());
-      tmpFile.deleteOnExit();
 
+      String tmpFilePath = StringUtil.join("/", provider.getFileUploadTmpDirURI().toString(), name + "." + UUID.randomUUID().toString());
+      java.net.URI tmpFileURI = ControllerConf.getUriFromPath(tmpFilePath);
+
+      PinotFS pinotFS = PinotFSFactory.create(tmpFileURI.getScheme());
+
+      File localTmpFile = new File(provider.getFileUploadTmpDir(), name + "." + UUID.randomUUID().toString());
+      localTmpFile.deleteOnExit();
+
+      // Copy multipart to local
       try (InputStream inputStream = bodyPart.getValueAs(InputStream.class);
-          OutputStream outputStream = new FileOutputStream(tmpFile)) {
+          OutputStream outputStream = new FileOutputStream(localTmpFile)) {
         IOUtils.copyLarge(inputStream, outputStream);
       }
 
+      // If remote, will need to copy tmp file to remote storage
+      try {
+        if ((!(pinotFS instanceof LocalPinotFS))) {
+          pinotFS.copyFromLocalFile(localTmpFile, tmpFileURI);
+        }
+      } catch (Exception e) {
+        pinotFS.delete(tmpFileURI, true);
+        LOGGER.error("Could not copy from local to remote storage");
+      }
+
       LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
       final String rawTableName = llcSegmentName.getTableName();
-      final File tableDir = new File(provider.getBaseDataDir(), rawTableName);
-      File segmentFile;
+      final java.net.URI tableDirURI = ControllerConf.getUriFromPath(StringUtil.join("/", provider.getBaseDataDirURI().toString(), rawTableName));
+      java.net.URI segmentFileURI;
       if (isSplitCommit) {
         String uniqueSegmentFileName = SegmentCompletionUtils.generateSegmentFileName(segmentName);
-        segmentFile = new File(tableDir, uniqueSegmentFileName);
+        segmentFileURI = ControllerConf.getUriFromPath(StringUtil.join("/", tableDirURI.toString(), uniqueSegmentFileName));
       } else {
-        segmentFile = new File(tableDir, segmentName);
+        segmentFileURI = ControllerConf.getUriFromPath(StringUtil.join("/", tableDirURI.toString(), segmentName));
       }
 
       if (isSplitCommit) {
-        FileUtils.moveFile(tmpFile, segmentFile);
+        try {
+          pinotFS.copyFromLocalFile(localTmpFile, segmentFileURI);
+        } catch (Exception e) {
+          LOGGER.error("Could not copy from {} to {}", localTmpFile.getAbsolutePath(), segmentFileURI.toString());
+        }
       } else {
         // Multiple threads can reach this point at the same time, if the following scenario happens
         // The server that was asked to commit did so very slowly (due to network speeds). Meanwhile the FSM in
@@ -337,16 +361,20 @@ public class LLCSegmentCompletionHandlers {
         // For now, we live with these corner cases. Once we have split-commit enabled and working, this code will no longer
         // be used.
         synchronized (SegmentCompletionManager.getInstance()) {
-          if (segmentFile.exists()) {
-            LOGGER.warn("Segment file {} exists. Replacing with upload from {}", segmentFile.getAbsolutePath(),
+          if (pinotFS.exists(segmentFileURI)) {
+            LOGGER.warn("Segment file {} exists. Replacing with upload from {}", segmentFileURI.toString(),
                 instanceId);
-            FileUtils.deleteQuietly(segmentFile);
+            pinotFS.delete(segmentFileURI, true);
+          }
+          try {
+            pinotFS.copyFromLocalFile(localTmpFile, segmentFileURI);
+          } catch (Exception e) {
+            LOGGER.error("Could not copy from {} to {}", localTmpFile.getAbsolutePath(), segmentFileURI.toString());
           }
-          FileUtils.moveFile(tmpFile, segmentFile);
         }
       }
-      LOGGER.info("Moved file {} to {}", tmpFile.getAbsolutePath(), segmentFile.getAbsolutePath());
-      return new URI(SCHEME + segmentFile.getAbsolutePath(), /* boolean escaped */ false).toString();
+      LOGGER.info("Moved file {} to {}", localTmpFile.getAbsolutePath(), segmentFileURI.toString());
+      return new URI(SCHEME + segmentFileURI.toString(), /* boolean escaped */ false).toString();
     } catch (InvalidControllerConfigException e) {
       LOGGER.error("Invalid controller config exception from instance {} for segment {}", instanceId, segmentName, e);
       return null;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org