You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by je...@apache.org on 2018/11/26 18:44:31 UTC

[incubator-pinot] branch rt updated (29d78c5 -> 830e443)

This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a change to branch rt
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


 discard 29d78c5  Adding pluggable storage support for realtime split commit
     new 830e443  Adding pluggable storage support for realtime upload

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (29d78c5)
            \
             N -- N -- N   refs/heads/rt (830e443)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../controller/api/resources/LLCSegmentCompletionHandlers.java    | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Adding pluggable storage support for realtime upload

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch rt
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 830e4437e3ee8080af3e9304bdd235f9ceeba30e
Author: Jennifer Dai <jd...@linkedin.com>
AuthorDate: Mon Nov 26 10:38:11 2018 -0800

    Adding pluggable storage support for realtime upload
---
 .../resources/LLCSegmentCompletionHandlers.java    | 58 ++++++++++++++++------
 1 file changed, 43 insertions(+), 15 deletions(-)

diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
index 38c0dd7..6832b85 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
@@ -18,9 +18,13 @@ package com.linkedin.pinot.controller.api.resources;
 import com.google.common.annotations.VisibleForTesting;
 import com.linkedin.pinot.common.protocols.SegmentCompletionProtocol;
 import com.linkedin.pinot.common.utils.LLCSegmentName;
+import com.linkedin.pinot.common.utils.StringUtil;
 import com.linkedin.pinot.controller.ControllerConf;
 import com.linkedin.pinot.controller.helix.core.realtime.SegmentCompletionManager;
 import com.linkedin.pinot.controller.util.SegmentCompletionUtils;
+import com.linkedin.pinot.filesystem.LocalPinotFS;
+import com.linkedin.pinot.filesystem.PinotFS;
+import com.linkedin.pinot.filesystem.PinotFSFactory;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -39,7 +43,6 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import org.apache.commons.httpclient.URI;
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.glassfish.jersey.media.multipart.FormDataBodyPart;
 import org.glassfish.jersey.media.multipart.FormDataMultiPart;
@@ -300,27 +303,48 @@ public class LLCSegmentCompletionHandlers {
       FormDataBodyPart bodyPart = map.get(name).get(0);
 
       FileUploadPathProvider provider = new FileUploadPathProvider(_controllerConf);
-      File tmpFile = new File(provider.getFileUploadTmpDir(), name + "." + UUID.randomUUID().toString());
-      tmpFile.deleteOnExit();
 
+      String tmpFilePath = StringUtil.join("/", provider.getFileUploadTmpDirURI().toString(), name + "." + UUID.randomUUID().toString());
+      java.net.URI tmpFileURI = ControllerConf.getUriFromPath(tmpFilePath);
+
+      PinotFS pinotFS = PinotFSFactory.create(tmpFileURI.getScheme());
+
+      File localTmpFile = new File(provider.getFileUploadTmpDir(), name + "." + UUID.randomUUID().toString());
+      localTmpFile.deleteOnExit();
+
+      // Copy multipart to local
       try (InputStream inputStream = bodyPart.getValueAs(InputStream.class);
-          OutputStream outputStream = new FileOutputStream(tmpFile)) {
+          OutputStream outputStream = new FileOutputStream(localTmpFile)) {
         IOUtils.copyLarge(inputStream, outputStream);
       }
 
+      // If remote, will need to copy tmp file to remote storage
+      try {
+        if ((!(pinotFS instanceof LocalPinotFS))) {
+          pinotFS.copyFromLocalFile(localTmpFile, tmpFileURI);
+        }
+      } catch (Exception e) {
+        pinotFS.delete(tmpFileURI, true);
+        LOGGER.error("Could not copy from local to remote storage");
+      }
+
       LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
       final String rawTableName = llcSegmentName.getTableName();
-      final File tableDir = new File(provider.getBaseDataDir(), rawTableName);
-      File segmentFile;
+      final java.net.URI tableDirURI = ControllerConf.getUriFromPath(StringUtil.join("/", provider.getBaseDataDirURI().toString(), rawTableName));
+      java.net.URI segmentFileURI;
       if (isSplitCommit) {
         String uniqueSegmentFileName = SegmentCompletionUtils.generateSegmentFileName(segmentName);
-        segmentFile = new File(tableDir, uniqueSegmentFileName);
+        segmentFileURI = ControllerConf.getUriFromPath(StringUtil.join("/", tableDirURI.toString(), uniqueSegmentFileName));
       } else {
-        segmentFile = new File(tableDir, segmentName);
+        segmentFileURI = ControllerConf.getUriFromPath(StringUtil.join("/", tableDirURI.toString(), segmentName));
       }
 
       if (isSplitCommit) {
-        FileUtils.moveFile(tmpFile, segmentFile);
+        try {
+          pinotFS.copyFromLocalFile(localTmpFile, segmentFileURI);
+        } catch (Exception e) {
+          LOGGER.error("Could not copy from {} to {}", localTmpFile.getAbsolutePath(), segmentFileURI.toString());
+        }
       } else {
         // Multiple threads can reach this point at the same time, if the following scenario happens
         // The server that was asked to commit did so very slowly (due to network speeds). Meanwhile the FSM in
@@ -337,16 +361,20 @@ public class LLCSegmentCompletionHandlers {
         // For now, we live with these corner cases. Once we have split-commit enabled and working, this code will no longer
         // be used.
         synchronized (SegmentCompletionManager.getInstance()) {
-          if (segmentFile.exists()) {
-            LOGGER.warn("Segment file {} exists. Replacing with upload from {}", segmentFile.getAbsolutePath(),
+          if (pinotFS.exists(segmentFileURI)) {
+            LOGGER.warn("Segment file {} exists. Replacing with upload from {}", segmentFileURI.toString(),
                 instanceId);
-            FileUtils.deleteQuietly(segmentFile);
+            pinotFS.delete(segmentFileURI, true);
+          }
+          try {
+            pinotFS.copyFromLocalFile(localTmpFile, segmentFileURI);
+          } catch (Exception e) {
+            LOGGER.error("Could not copy from {} to {}", localTmpFile.getAbsolutePath(), segmentFileURI.toString());
           }
-          FileUtils.moveFile(tmpFile, segmentFile);
         }
       }
-      LOGGER.info("Moved file {} to {}", tmpFile.getAbsolutePath(), segmentFile.getAbsolutePath());
-      return new URI(SCHEME + segmentFile.getAbsolutePath(), /* boolean escaped */ false).toString();
+      LOGGER.info("Moved file {} to {}", localTmpFile.getAbsolutePath(), segmentFileURI.toString());
+      return new URI(SCHEME + segmentFileURI.toString(), /* boolean escaped */ false).toString();
     } catch (InvalidControllerConfigException e) {
       LOGGER.error("Invalid controller config exception from instance {} for segment {}", instanceId, segmentName, e);
       return null;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org