You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by je...@apache.org on 2018/11/26 18:44:32 UTC
[incubator-pinot] 01/01: Adding pluggable storage support for
realtime upload
This is an automated email from the ASF dual-hosted git repository.
jenniferdai pushed a commit to branch rt
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 830e4437e3ee8080af3e9304bdd235f9ceeba30e
Author: Jennifer Dai <jd...@linkedin.com>
AuthorDate: Mon Nov 26 10:38:11 2018 -0800
Adding pluggable storage support for realtime upload
---
.../resources/LLCSegmentCompletionHandlers.java | 58 ++++++++++++++++------
1 file changed, 43 insertions(+), 15 deletions(-)
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
index 38c0dd7..6832b85 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
@@ -18,9 +18,13 @@ package com.linkedin.pinot.controller.api.resources;
import com.google.common.annotations.VisibleForTesting;
import com.linkedin.pinot.common.protocols.SegmentCompletionProtocol;
import com.linkedin.pinot.common.utils.LLCSegmentName;
+import com.linkedin.pinot.common.utils.StringUtil;
import com.linkedin.pinot.controller.ControllerConf;
import com.linkedin.pinot.controller.helix.core.realtime.SegmentCompletionManager;
import com.linkedin.pinot.controller.util.SegmentCompletionUtils;
+import com.linkedin.pinot.filesystem.LocalPinotFS;
+import com.linkedin.pinot.filesystem.PinotFS;
+import com.linkedin.pinot.filesystem.PinotFSFactory;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -39,7 +43,6 @@ import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import org.apache.commons.httpclient.URI;
-import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.glassfish.jersey.media.multipart.FormDataBodyPart;
import org.glassfish.jersey.media.multipart.FormDataMultiPart;
@@ -300,27 +303,48 @@ public class LLCSegmentCompletionHandlers {
FormDataBodyPart bodyPart = map.get(name).get(0);
FileUploadPathProvider provider = new FileUploadPathProvider(_controllerConf);
- File tmpFile = new File(provider.getFileUploadTmpDir(), name + "." + UUID.randomUUID().toString());
- tmpFile.deleteOnExit();
+ String tmpFilePath = StringUtil.join("/", provider.getFileUploadTmpDirURI().toString(), name + "." + UUID.randomUUID().toString());
+ java.net.URI tmpFileURI = ControllerConf.getUriFromPath(tmpFilePath);
+
+ PinotFS pinotFS = PinotFSFactory.create(tmpFileURI.getScheme());
+
+ File localTmpFile = new File(provider.getFileUploadTmpDir(), name + "." + UUID.randomUUID().toString());
+ localTmpFile.deleteOnExit();
+
+ // Copy multipart to local
try (InputStream inputStream = bodyPart.getValueAs(InputStream.class);
- OutputStream outputStream = new FileOutputStream(tmpFile)) {
+ OutputStream outputStream = new FileOutputStream(localTmpFile)) {
IOUtils.copyLarge(inputStream, outputStream);
}
+ // If remote, will need to copy tmp file to remote storage
+ try {
+ if ((!(pinotFS instanceof LocalPinotFS))) {
+ pinotFS.copyFromLocalFile(localTmpFile, tmpFileURI);
+ }
+ } catch (Exception e) {
+ pinotFS.delete(tmpFileURI, true);
+ LOGGER.error("Could not copy from local to remote storage");
+ }
+
LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
final String rawTableName = llcSegmentName.getTableName();
- final File tableDir = new File(provider.getBaseDataDir(), rawTableName);
- File segmentFile;
+ final java.net.URI tableDirURI = ControllerConf.getUriFromPath(StringUtil.join("/", provider.getBaseDataDirURI().toString(), rawTableName));
+ java.net.URI segmentFileURI;
if (isSplitCommit) {
String uniqueSegmentFileName = SegmentCompletionUtils.generateSegmentFileName(segmentName);
- segmentFile = new File(tableDir, uniqueSegmentFileName);
+ segmentFileURI = ControllerConf.getUriFromPath(StringUtil.join("/", tableDirURI.toString(), uniqueSegmentFileName));
} else {
- segmentFile = new File(tableDir, segmentName);
+ segmentFileURI = ControllerConf.getUriFromPath(StringUtil.join("/", tableDirURI.toString(), segmentName));
}
if (isSplitCommit) {
- FileUtils.moveFile(tmpFile, segmentFile);
+ try {
+ pinotFS.copyFromLocalFile(localTmpFile, segmentFileURI);
+ } catch (Exception e) {
+ LOGGER.error("Could not copy from {} to {}", localTmpFile.getAbsolutePath(), segmentFileURI.toString());
+ }
} else {
// Multiple threads can reach this point at the same time, if the following scenario happens
// The server that was asked to commit did so very slowly (due to network speeds). Meanwhile the FSM in
@@ -337,16 +361,20 @@ public class LLCSegmentCompletionHandlers {
// For now, we live with these corner cases. Once we have split-commit enabled and working, this code will no longer
// be used.
synchronized (SegmentCompletionManager.getInstance()) {
- if (segmentFile.exists()) {
- LOGGER.warn("Segment file {} exists. Replacing with upload from {}", segmentFile.getAbsolutePath(),
+ if (pinotFS.exists(segmentFileURI)) {
+ LOGGER.warn("Segment file {} exists. Replacing with upload from {}", segmentFileURI.toString(),
instanceId);
- FileUtils.deleteQuietly(segmentFile);
+ pinotFS.delete(segmentFileURI, true);
+ }
+ try {
+ pinotFS.copyFromLocalFile(localTmpFile, segmentFileURI);
+ } catch (Exception e) {
+ LOGGER.error("Could not copy from {} to {}", localTmpFile.getAbsolutePath(), segmentFileURI.toString());
}
- FileUtils.moveFile(tmpFile, segmentFile);
}
}
- LOGGER.info("Moved file {} to {}", tmpFile.getAbsolutePath(), segmentFile.getAbsolutePath());
- return new URI(SCHEME + segmentFile.getAbsolutePath(), /* boolean escaped */ false).toString();
+ LOGGER.info("Moved file {} to {}", localTmpFile.getAbsolutePath(), segmentFileURI.toString());
+ return new URI(SCHEME + segmentFileURI.toString(), /* boolean escaped */ false).toString();
} catch (InvalidControllerConfigException e) {
LOGGER.error("Invalid controller config exception from instance {} for segment {}", instanceId, segmentName, e);
return null;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org