You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by je...@apache.org on 2018/12/03 21:44:29 UTC

[incubator-pinot] 01/01: Changing segmentCommitEnd to support deep storage

This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch segmentcommitend
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 625482337f87474fce3faeaf49663b96ddda660a
Author: Jennifer Dai <jd...@linkedin.com>
AuthorDate: Mon Dec 3 10:46:27 2018 -0800

    Changing segmentCommitEnd to support deep storage
---
 .../linkedin/pinot/controller/ControllerConf.java  |  2 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 49 ++++++++++++----------
 2 files changed, 28 insertions(+), 23 deletions(-)

diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
index e18f13a..f7214bd 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/ControllerConf.java
@@ -149,7 +149,7 @@ public class ControllerConf extends PropertiesConfiguration {
   }
 
   public String getLocalTempDir() {
-    return getString(LOCAL_TEMP_DIR, null);
+    return getString(LOCAL_TEMP_DIR, getDataDir());
   }
 
   public void setPinotFSFactoryClasses(Configuration pinotFSFactoryClasses) {
diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 9cedc35..01da549 100644
--- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -59,11 +59,12 @@ import com.linkedin.pinot.core.realtime.stream.StreamConfigProperties;
 import com.linkedin.pinot.core.segment.creator.impl.V1Constants;
 import com.linkedin.pinot.core.segment.index.ColumnMetadata;
 import com.linkedin.pinot.core.segment.index.SegmentMetadataImpl;
+import com.linkedin.pinot.filesystem.PinotFS;
+import com.linkedin.pinot.filesystem.PinotFSFactory;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.InputStream;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.nio.file.FileSystems;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -349,11 +350,11 @@ public class PinotLLCRealtimeSegmentManager {
   public boolean commitSegmentFile(String tableName, CommittingSegmentDescriptor committingSegmentDescriptor) {
     String segmentName = committingSegmentDescriptor.getSegmentName();
     String segmentLocation = committingSegmentDescriptor.getSegmentLocation();
-    File segmentFile = convertURIToSegmentLocation(segmentLocation);
-
-    File baseDir = new File(_controllerConf.getDataDir());
-    File tableDir = new File(baseDir, tableName);
-    File fileToMoveTo = new File(tableDir, segmentName);
+    URI segmentFileURI = ControllerConf.getUriFromPath(segmentLocation);
+    URI baseDirURI = ControllerConf.getUriFromPath(_controllerConf.getDataDir());
+    URI tableDirURI = ControllerConf.getUriFromPath(StringUtil.join("/", _controllerConf.getDataDir(), tableName));
+    URI uriToMoveTo = ControllerConf.getUriFromPath(StringUtil.join("/", tableDirURI.toString(), segmentName));
+    PinotFS pinotFS = PinotFSFactory.create(baseDirURI.getScheme());
 
     if (!isConnected() || !isLeader()) {
       // We can potentially log a different value than what we saw ....
@@ -364,27 +365,24 @@ public class PinotLLCRealtimeSegmentManager {
     }
 
     try {
-      com.linkedin.pinot.common.utils.FileUtils.moveFileWithOverwrite(segmentFile, fileToMoveTo);
+      pinotFS.move(segmentFileURI, uriToMoveTo, true);
     } catch (Exception e) {
-      LOGGER.error("Could not move {} to {}", segmentFile, segmentName, e);
+      LOGGER.error("Could not move {} to {}", segmentLocation, segmentName, e);
       return false;
     }
-    for (File file : tableDir.listFiles()) {
-      if (file.getName().startsWith(SegmentCompletionUtils.getSegmentNamePrefix(segmentName))) {
-        LOGGER.warn("Deleting " + file);
-        FileUtils.deleteQuietly(file);
-      }
-    }
-    return true;
-  }
 
-  private static File convertURIToSegmentLocation(String segmentLocation) {
     try {
-      URI uri = new URI(segmentLocation);
-      return new File(uri.getPath());
-    } catch (URISyntaxException e) {
-      throw new RuntimeException("Could not convert URI " + segmentLocation + " to segment location", e);
+      for (String uri : pinotFS.listFiles(tableDirURI, true)) {
+        if (uri.startsWith(SegmentCompletionUtils.getSegmentNamePrefix(segmentName))) {
+          LOGGER.warn("Deleting " + uri);
+          pinotFS.delete(new URI(uri), true);
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.warn("Could not tmp segment files");
     }
+
+    return true;
   }
 
   /**
@@ -652,12 +650,19 @@ public class PinotLLCRealtimeSegmentManager {
   protected SegmentMetadataImpl extractSegmentMetadata(final String rawTableName, final String segmentNameStr) {
     String baseDirStr = StringUtil.join("/", _controllerConf.getDataDir(), rawTableName);
     String segFileStr = StringUtil.join("/", baseDirStr, segmentNameStr);
-    String tempMetadataDirStr = StringUtil.join("/", baseDirStr, segmentNameStr + METADATA_TEMP_DIR_SUFFIX);
+
+    String localTempDir = _controllerConf.getLocalTempDir();
+    String tempMetadataDirStr = StringUtil.join("/", localTempDir, segmentNameStr + METADATA_TEMP_DIR_SUFFIX);
+
+    PinotFS pinotFS = PinotFSFactory.create(ControllerConf.getUriFromPath(baseDirStr).getScheme());
+
     File tempMetadataDir = new File(tempMetadataDirStr);
 
     try {
       Preconditions.checkState(tempMetadataDir.mkdirs(), "Failed to create directory: %s", tempMetadataDirStr);
 
+      pinotFS.copyToLocalFile(ControllerConf.getUriFromPath(segFileStr), new File(tempMetadataDirStr));
+
       // Extract metadata.properties
       InputStream metadataPropertiesInputStream =
           TarGzCompressionUtils.unTarOneFile(new FileInputStream(new File(segFileStr)),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org