You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by je...@apache.org on 2018/12/03 21:44:29 UTC
[incubator-pinot] 01/01: Changing segmentCommitEnd to support deep
storage
This is an automated email from the ASF dual-hosted git repository.
jenniferdai pushed a commit to branch segmentcommitend
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 625482337f87474fce3faeaf49663b96ddda660a
Author: Jennifer Dai <jd...@linkedin.com>
AuthorDate: Mon Dec 3 10:46:27 2018 -0800
Changing segmentCommitEnd to support deep storage
---
.../linkedin/pinot/controller/ControllerConf.java | 2 +-
.../realtime/PinotLLCRealtimeSegmentManager.java | 49 ++++++++++++----------
2 files changed, 28 insertions(+), 23 deletions(-)
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
index e18f13a..f7214bd 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
@@ -149,7 +149,7 @@ public class ControllerConf extends PropertiesConfiguration {
}
public String getLocalTempDir() {
- return getString(LOCAL_TEMP_DIR, null);
+ return getString(LOCAL_TEMP_DIR, getDataDir());
}
public void setPinotFSFactoryClasses(Configuration pinotFSFactoryClasses) {
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 9cedc35..01da549 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -59,11 +59,12 @@ import com.linkedin.pinot.core.realtime.stream.StreamConfigProperties;
import com.linkedin.pinot.core.segment.creator.impl.V1Constants;
import com.linkedin.pinot.core.segment.index.ColumnMetadata;
import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl;
+import com.linkedin.pinot.filesystem.PinotFS;
+import com.linkedin.pinot.filesystem.PinotFSFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.net.URI;
-import java.net.URISyntaxException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -349,11 +350,11 @@ public class PinotLLCRealtimeSegmentManager {
public boolean commitSegmentFile(String tableName, CommittingSegmentDescriptor committingSegmentDescriptor) {
String segmentName = committingSegmentDescriptor.getSegmentName();
String segmentLocation = committingSegmentDescriptor.getSegmentLocation();
- File segmentFile = convertURIToSegmentLocation(segmentLocation);
-
- File baseDir = new File(_controllerConf.getDataDir());
- File tableDir = new File(baseDir, tableName);
- File fileToMoveTo = new File(tableDir, segmentName);
+ URI segmentFileURI = ControllerConf.getUriFromPath(segmentLocation);
+ URI baseDirURI = ControllerConf.getUriFromPath(_controllerConf.getDataDir());
+ URI tableDirURI = ControllerConf.getUriFromPath(StringUtil.join("/", _controllerConf.getDataDir(), tableName));
+ URI uriToMoveTo = ControllerConf.getUriFromPath(StringUtil.join("/", tableDirURI.toString(), segmentName));
+ PinotFS pinotFS = PinotFSFactory.create(baseDirURI.getScheme());
if (!isConnected() || !isLeader()) {
// We can potentially log a different value than what we saw ....
@@ -364,27 +365,24 @@ public class PinotLLCRealtimeSegmentManager {
}
try {
- com.linkedin.pinot.common.utils.FileUtils.moveFileWithOverwrite(segmentFile, fileToMoveTo);
+ pinotFS.move(segmentFileURI, uriToMoveTo, true);
} catch (Exception e) {
- LOGGER.error("Could not move {} to {}", segmentFile, segmentName, e);
+ LOGGER.error("Could not move {} to {}", segmentLocation, segmentName, e);
return false;
}
- for (File file : tableDir.listFiles()) {
- if (file.getName().startsWith(SegmentCompletionUtils.getSegmentNamePrefix(segmentName))) {
- LOGGER.warn("Deleting " + file);
- FileUtils.deleteQuietly(file);
- }
- }
- return true;
- }
- private static File convertURIToSegmentLocation(String segmentLocation) {
try {
- URI uri = new URI(segmentLocation);
- return new File(uri.getPath());
- } catch (URISyntaxException e) {
- throw new RuntimeException("Could not convert URI " + segmentLocation + " to segment location", e);
+ for (String uri : pinotFS.listFiles(tableDirURI, true)) {
+ if (uri.startsWith(SegmentCompletionUtils.getSegmentNamePrefix(segmentName))) {
+ LOGGER.warn("Deleting " + uri);
+ pinotFS.delete(new URI(uri), true);
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Could not tmp segment files");
}
+
+ return true;
}
/**
@@ -652,12 +650,19 @@ public class PinotLLCRealtimeSegmentManager {
protected SegmentMetadataImpl extractSegmentMetadata(final String rawTableName, final String segmentNameStr) {
String baseDirStr = StringUtil.join("/", _controllerConf.getDataDir(), rawTableName);
String segFileStr = StringUtil.join("/", baseDirStr, segmentNameStr);
- String tempMetadataDirStr = StringUtil.join("/", baseDirStr, segmentNameStr + METADATA_TEMP_DIR_SUFFIX);
+
+ String localTempDir = _controllerConf.getLocalTempDir();
+ String tempMetadataDirStr = StringUtil.join("/", localTempDir, segmentNameStr + METADATA_TEMP_DIR_SUFFIX);
+
+ PinotFS pinotFS = PinotFSFactory.create(ControllerConf.getUriFromPath(baseDirStr).getScheme());
+
File tempMetadataDir = new File(tempMetadataDirStr);
try {
Preconditions.checkState(tempMetadataDir.mkdirs(), "Failed to create directory: %s", tempMetadataDirStr);
+ pinotFS.copyToLocalFile(ControllerConf.getUriFromPath(segFileStr), new File(tempMetadataDirStr));
+
// Extract metadata.properties
InputStream metadataPropertiesInputStream =
TarGzCompressionUtils.unTarOneFile(new FileInputStream(new File(segFileStr)),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org