You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/07/13 00:33:58 UTC
[incubator-pinot] branch master updated: Add URIUtils class to
handle URI/URL encoding/decoding (#4426)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f671072 Add URIUtils class to handle URI/URL encoding/decoding (#4426)
f671072 is described below
commit f67107202dfbc5ca766fc5fd4e1bbab9e503cb7d
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Fri Jul 12 17:33:53 2019 -0700
Add URIUtils class to handle URI/URL encoding/decoding (#4426)
Fix the double encoding bug of encoding URI without scheme
"/segments/segment %" ->
Without fix: "file:/segments/segment+%2525"
With fix: "file:/segments/segment+%25"
---
.../common/utils/FileUploadDownloadClient.java | 16 ++--
.../org/apache/pinot/common/utils/URIUtils.java | 88 +++++++++++++++++++++
.../org/apache/pinot/filesystem/LocalPinotFS.java | 91 ++++++++--------------
.../apache/pinot/common/utils/URIUtilsTest.java | 68 ++++++++++++++++
.../apache/pinot/controller/ControllerConf.java | 46 +----------
.../api/resources/FileUploadPathProvider.java | 5 +-
.../resources/LLCSegmentCompletionHandlers.java | 25 +++---
.../api/resources/PinotSegmentRestletResource.java | 20 ++---
.../PinotSegmentUploadRestletResource.java | 61 ++++++---------
.../helix/ControllerRequestURLBuilder.java | 21 +++--
.../helix/core/SegmentDeletionManager.java | 24 ++----
.../realtime/PinotLLCRealtimeSegmentManager.java | 34 ++++----
.../helix/core/realtime/SegmentCompletionTest.java | 7 +-
.../tests/PinotURIUploadIntegrationTest.java | 8 +-
.../tools/query/comparison/ClusterStarter.java | 4 +-
15 files changed, 280 insertions(+), 238 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index d3d02cd..24fcd95 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -26,10 +26,8 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
-import java.net.URLEncoder;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
@@ -133,14 +131,16 @@ public class FileUploadDownloadClient implements Closeable {
public static URI getDeleteSegmentHttpUri(String host, int port, String rawTableName, String segmentName,
String tableType)
- throws URISyntaxException, UnsupportedEncodingException {
- return new URI(StringUtil.join("/", StringUtils.chomp(HTTP + "://" + host + ":" + port, "/"),
- OLD_SEGMENT_PATH, rawTableName + "/" + URLEncoder.encode(segmentName, "UTF-8") + TYPE_DELIMITER + tableType));
+ throws URISyntaxException {
+ return new URI(StringUtil.join("/", StringUtils.chomp(HTTP + "://" + host + ":" + port, "/"), OLD_SEGMENT_PATH,
+ rawTableName + "/" + URIUtils.encode(segmentName) + TYPE_DELIMITER + tableType));
}
- public static URI getRetrieveAllSegmentWithTableTypeHttpUri(String host, int port, String rawTableName, String tableType) throws URISyntaxException {
- return new URI(StringUtil.join("/", StringUtils.chomp(HTTP + "://" + host + ":" + port, "/"),
- OLD_SEGMENT_PATH, rawTableName + TYPE_DELIMITER + tableType));
+ public static URI getRetrieveAllSegmentWithTableTypeHttpUri(String host, int port, String rawTableName,
+ String tableType)
+ throws URISyntaxException {
+ return new URI(StringUtil.join("/", StringUtils.chomp(HTTP + "://" + host + ":" + port, "/"), OLD_SEGMENT_PATH,
+ rawTableName + TYPE_DELIMITER + tableType));
}
public static URI getRetrieveSchemaHttpURI(String host, int port, String schemaName)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java
new file mode 100644
index 0000000..b5f0aeb
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/URIUtils.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.util.StringJoiner;
+
+
+public class URIUtils {
+ private URIUtils() {
+ }
+
+ /**
+ * Returns the URI for the given base path and optional parts, appends the local (file) scheme to the URI if no
+ * scheme exists. All the parts will be appended to the base path with the file separator.
+ */
+ public static URI getUri(String basePath, String... parts) {
+ String path = getPath(basePath, parts);
+ try {
+ URI uri = new URI(path);
+ if (uri.getScheme() != null) {
+ return uri;
+ } else {
+ return new URI(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME + ":" + path);
+ }
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("Illegal URI path: " + path, e);
+ }
+ }
+
+ /**
+ * Returns the path for the given base path and optional parts. All the parts will be appended to the base path with
+ * the file separator.
+ */
+ public static String getPath(String basePath, String... parts) {
+ StringJoiner stringJoiner = new StringJoiner(File.separator);
+ stringJoiner.add(basePath);
+ for (String part : parts) {
+ stringJoiner.add(part);
+ }
+ return stringJoiner.toString();
+ }
+
+ /**
+ * Returns the download URL with the segment name encoded.
+ */
+ public static String constructDownloadUrl(String baseUrl, String rawTableName, String segmentName) {
+ return getPath(baseUrl, "segments", rawTableName, encode(segmentName));
+ }
+
+ public static String encode(String string) {
+ try {
+ return URLEncoder.encode(string, "UTF-8");
+ } catch (Exception e) {
+ // Should never happen
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static String decode(String string) {
+ try {
+ return URLDecoder.decode(string, "UTF-8");
+ } catch (Exception e) {
+ // Should never happen
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java b/pinot-common/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java
index 7161031..4a6e263 100644
--- a/pinot-common/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java
+++ b/pinot-common/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java
@@ -20,18 +20,14 @@ package org.apache.pinot.filesystem;
import java.io.File;
import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.net.URI;
-import java.net.URLDecoder;
-import java.net.URLEncoder;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.pinot.common.utils.URIUtils;
/**
@@ -39,11 +35,6 @@ import org.slf4j.LoggerFactory;
* if access to the file is denied.
*/
public class LocalPinotFS extends PinotFS {
- private static final Logger LOGGER = LoggerFactory.getLogger(LocalPinotFS.class);
- private static final String DEFAULT_ENCODING = "UTF-8";
-
- public LocalPinotFS() {
- }
@Override
public void init(Configuration configuration) {
@@ -52,14 +43,14 @@ public class LocalPinotFS extends PinotFS {
@Override
public boolean mkdir(URI uri)
throws IOException {
- FileUtils.forceMkdir(new File(decodeURI(uri.getRawPath())));
+ FileUtils.forceMkdir(toFile(uri));
return true;
}
@Override
public boolean delete(URI segmentUri, boolean forceDelete)
throws IOException {
- File file = new File(decodeURI(segmentUri.getRawPath()));
+ File file = toFile(segmentUri);
if (file.isDirectory()) {
// Returns false if directory isn't empty
if (listFiles(segmentUri, false).length > 0 && !forceDelete) {
@@ -77,8 +68,8 @@ public class LocalPinotFS extends PinotFS {
@Override
protected boolean doMove(URI srcUri, URI dstUri)
throws IOException {
- File srcFile = new File(decodeURI(srcUri.getRawPath()));
- File dstFile = new File(decodeURI(dstUri.getRawPath()));
+ File srcFile = toFile(srcUri);
+ File dstFile = toFile(dstUri);
if (srcFile.isDirectory()) {
FileUtils.moveDirectory(srcFile, dstFile);
} else {
@@ -90,32 +81,18 @@ public class LocalPinotFS extends PinotFS {
@Override
public boolean copy(URI srcUri, URI dstUri)
throws IOException {
- File srcFile = new File(decodeURI(srcUri.getRawPath()));
- File dstFile = new File(decodeURI(dstUri.getRawPath()));
- if (dstFile.exists()) {
- FileUtils.deleteQuietly(dstFile);
- }
- if (srcFile.isDirectory()) {
- // Throws Exception on failure
- FileUtils.copyDirectory(srcFile, dstFile);
- } else {
- // Will create parent directories, throws Exception on failure
- FileUtils.copyFile(srcFile, dstFile);
- }
+ copy(toFile(srcUri), toFile(dstUri));
return true;
}
@Override
- public boolean exists(URI fileUri)
- throws IOException {
- File file = new File(decodeURI(fileUri.getRawPath()));
- return file.exists();
+ public boolean exists(URI fileUri) {
+ return toFile(fileUri).exists();
}
@Override
- public long length(URI fileUri)
- throws IOException {
- File file = new File(decodeURI(fileUri.getRawPath()));
+ public long length(URI fileUri) {
+ File file = toFile(fileUri);
if (file.isDirectory()) {
throw new IllegalArgumentException("File is directory");
}
@@ -125,7 +102,7 @@ public class LocalPinotFS extends PinotFS {
@Override
public String[] listFiles(URI fileUri, boolean recursive)
throws IOException {
- File file = new File(decodeURI(fileUri.getRawPath()));
+ File file = toFile(fileUri);
if (!recursive) {
return Arrays.stream(file.list()).map(s -> new File(file, s)).map(File::getAbsolutePath).toArray(String[]::new);
} else {
@@ -137,56 +114,52 @@ public class LocalPinotFS extends PinotFS {
@Override
public void copyToLocalFile(URI srcUri, File dstFile)
throws Exception {
- copy(srcUri, new URI(encodeURI(dstFile.getAbsolutePath())));
+ copy(toFile(srcUri), dstFile);
}
@Override
public void copyFromLocalFile(File srcFile, URI dstUri)
throws Exception {
- copy(new URI(encodeURI(srcFile.getAbsolutePath())), dstUri);
+ copy(srcFile, toFile(dstUri));
}
@Override
public boolean isDirectory(URI uri) {
- File file = new File(decodeURI(uri.getRawPath()));
- return file.isDirectory();
+ return toFile(uri).isDirectory();
}
@Override
public long lastModified(URI uri) {
- File file = new File(decodeURI(uri.getRawPath()));
- return file.lastModified();
+ return toFile(uri).lastModified();
}
@Override
public boolean touch(URI uri)
throws IOException {
- File file = new File(decodeURI(uri.getRawPath()));
- if (!exists(uri)) {
+ File file = toFile(uri);
+ if (!file.exists()) {
return file.createNewFile();
}
return file.setLastModified(System.currentTimeMillis());
}
- private String encodeURI(String uri) {
- String encodedStr;
- try {
- encodedStr = URLEncoder.encode(uri, DEFAULT_ENCODING);
- } catch (UnsupportedEncodingException e) {
- LOGGER.warn("Could not encode uri {}", uri);
- throw new RuntimeException(e);
- }
- return encodedStr;
+ private static File toFile(URI uri) {
+ // NOTE: Do not use new File(uri) because scheme might not exist and it does not decode '+' to ' '
+ // Do not use uri.getPath() because it does not decode '+' to ' '
+ return new File(URIUtils.decode(uri.getRawPath()));
}
- private String decodeURI(String uri) {
- String decodedStr;
- try {
- decodedStr = URLDecoder.decode(uri, DEFAULT_ENCODING);
- } catch (UnsupportedEncodingException e) {
- LOGGER.warn("Could not decode uri {}", uri);
- throw new RuntimeException(e);
+ private static void copy(File srcFile, File dstFile)
+ throws IOException {
+ if (dstFile.exists()) {
+ FileUtils.deleteQuietly(dstFile);
+ }
+ if (srcFile.isDirectory()) {
+ // Throws Exception on failure
+ FileUtils.copyDirectory(srcFile, dstFile);
+ } else {
+ // Will create parent directories, throws Exception on failure
+ FileUtils.copyFile(srcFile, dstFile);
}
- return decodedStr;
}
}
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/URIUtilsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/URIUtilsTest.java
new file mode 100644
index 0000000..229cce0
--- /dev/null
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/URIUtilsTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import java.util.Random;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class URIUtilsTest {
+
+ @Test
+ public void testGetUri() {
+ assertEquals(URIUtils.getUri("http://foo/bar").toString(), "http://foo/bar");
+ assertEquals(URIUtils.getUri("http://foo/bar", "table").toString(), "http://foo/bar/table");
+ assertEquals(URIUtils.getUri("http://foo/bar", "table", "segment+%25").toString(),
+ "http://foo/bar/table/segment+%25");
+ assertEquals(URIUtils.getUri("/foo/bar", "table", "segment+%25").toString(), "file:/foo/bar/table/segment+%25");
+ assertEquals(URIUtils.getUri("file:/foo/bar", "table", "segment+%25").toString(),
+ "file:/foo/bar/table/segment+%25");
+ }
+
+ @Test
+ public void testGetPath() {
+ assertEquals(URIUtils.getPath("http://foo/bar"), "http://foo/bar");
+ assertEquals(URIUtils.getPath("http://foo/bar", "table"), "http://foo/bar/table");
+ assertEquals(URIUtils.getPath("http://foo/bar", "table", "segment+%25"), "http://foo/bar/table/segment+%25");
+ assertEquals(URIUtils.getPath("/foo/bar", "table", "segment+%25"), "/foo/bar/table/segment+%25");
+ assertEquals(URIUtils.getPath("file:/foo/bar", "table", "segment+%25"), "file:/foo/bar/table/segment+%25");
+ }
+
+ @Test
+ public void testConstructDownloadUrl() {
+ assertEquals(URIUtils.constructDownloadUrl("http://foo/bar", "table", "segment"),
+ "http://foo/bar/segments/table/segment");
+ assertEquals(URIUtils.constructDownloadUrl("http://foo/bar", "table", "segment %"),
+ "http://foo/bar/segments/table/segment+%25");
+ }
+
+ @Test
+ public void testEncodeDecode() {
+ int numRounds = 1000;
+ int maxPartLength = 10;
+ Random random = new Random();
+ for (int i = 0; i < numRounds; i++) {
+ String randomString = RandomStringUtils.random(random.nextInt(maxPartLength + 1));
+ assertEquals(URIUtils.decode(URIUtils.encode(randomString)), randomString);
+ }
+ }
+}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 697fcf3..4e872bb 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -19,10 +19,6 @@
package org.apache.pinot.controller;
import java.io.File;
-import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
@@ -30,7 +26,6 @@ import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
-import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.filesystem.LocalPinotFS;
import org.slf4j.Logger;
@@ -59,9 +54,7 @@ public class ControllerConf extends PropertiesConfiguration {
private static final String CONTROLLER_MODE = "controller.mode";
public enum ControllerMode {
- DUAL,
- PINOT_ONLY,
- HELIX_ONLY
+ DUAL, PINOT_ONLY, HELIX_ONLY
}
public static class ControllerPeriodicTasksConf {
@@ -171,43 +164,6 @@ public class ControllerConf extends PropertiesConfiguration {
super();
}
- /**
- * Returns the URI for the given path, appends the local (file) scheme to the URI if no scheme exists.
- */
- public static URI getUriFromPath(String path) {
- try {
- URI uri = new URI(path);
- if (uri.getScheme() != null) {
- return uri;
- } else {
- return new URI(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME, path, null);
- }
- } catch (URISyntaxException e) {
- LOGGER.error("Could not construct uri from path {}", path);
- throw new RuntimeException(e);
- }
- }
-
- public static URI constructSegmentLocation(String baseDataDir, String tableName, String segmentName) {
- try {
- return getUriFromPath(StringUtil.join(File.separator, baseDataDir, tableName, URLEncoder.encode(segmentName, "UTF-8")));
- } catch (UnsupportedEncodingException e) {
- LOGGER
- .error("Could not construct segment location with baseDataDir {}, tableName {}, segmentName {}", baseDataDir,
- tableName, segmentName);
- throw new RuntimeException(e);
- }
- }
-
- public static String constructDownloadUrl(String tableName, String segmentName, String vip) {
- try {
- return StringUtil.join("/", vip, "segments", tableName, URLEncoder.encode(segmentName, "UTF-8"));
- } catch (UnsupportedEncodingException e) {
- // Shouldn't happen
- throw new AssertionError("Encountered error while encoding in UTF-8 format", e);
- }
- }
-
public void setLocalTempDir(String localTempDir) {
setProperty(LOCAL_TEMP_DIR, localTempDir);
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/FileUploadPathProvider.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/FileUploadPathProvider.java
index 055b19c..839c3da 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/FileUploadPathProvider.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/FileUploadPathProvider.java
@@ -24,6 +24,7 @@ import java.net.URI;
import org.apache.commons.lang.StringUtils;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.filesystem.LocalPinotFS;
import org.apache.pinot.filesystem.PinotFS;
import org.apache.pinot.filesystem.PinotFSFactory;
@@ -55,7 +56,7 @@ public class FileUploadPathProvider {
StringUtils.stripEnd(dataDir, "/");
try {
// URIs that are allowed to be remote
- _baseDataDirURI = ControllerConf.getUriFromPath(dataDir);
+ _baseDataDirURI = URIUtils.getUri(dataDir);
LOGGER.info("Data directory: {}", _baseDataDirURI);
_schemasTmpDirURI = new URI(_baseDataDirURI + SCHEMAS_TEMP);
LOGGER.info("Schema temporary directory: {}", _schemasTmpDirURI);
@@ -70,7 +71,7 @@ public class FileUploadPathProvider {
LOGGER.info("Local temporary directory is not configured, use data directory as the local temporary directory");
_localTempDirURI = _baseDataDirURI;
} else {
- _localTempDirURI = ControllerConf.getUriFromPath(localTempDir);
+ _localTempDirURI = URIUtils.getUri(localTempDir);
}
LOGGER.info("Local temporary directory: {}", _localTempDirURI);
if (!_localTempDirURI.getScheme().equalsIgnoreCase(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME)) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
index 84df735..b6def85 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/LLCSegmentCompletionHandlers.java
@@ -19,6 +19,7 @@
package org.apache.pinot.controller.api.resources;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@@ -40,14 +41,13 @@ import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
-
-import com.google.common.base.Preconditions;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
+import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager;
import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
@@ -155,8 +155,7 @@ public class LLCSegmentCompletionHandlers {
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withOffset(offset).withReason(stopReason);
LOGGER.info("Processing segmentStoppedConsuming:{}", requestParams.toString());
- SegmentCompletionProtocol.Response response =
- _segmentCompletionManager.segmentStoppedConsuming(requestParams);
+ SegmentCompletionProtocol.Response response = _segmentCompletionManager.segmentStoppedConsuming(requestParams);
final String responseStr = response.toJsonString();
LOGGER.info("Response to segmentStoppedConsuming for segment:{} is:{}", segmentName, responseStr);
return responseStr;
@@ -185,8 +184,7 @@ public class LLCSegmentCompletionHandlers {
LOGGER.info("Processing segmentCommitStart:{}", requestParams.toString());
- SegmentCompletionProtocol.Response response =
- _segmentCompletionManager.segmentCommitStart(requestParams);
+ SegmentCompletionProtocol.Response response = _segmentCompletionManager.segmentCommitStart(requestParams);
final String responseStr = response.toJsonString();
LOGGER.info("Response to segmentCommitStart for segment:{} is:{}", segmentName, responseStr);
return responseStr;
@@ -283,9 +281,9 @@ public class LLCSegmentCompletionHandlers {
try {
FileUploadPathProvider provider = new FileUploadPathProvider(_controllerConf);
final String rawTableName = new LLCSegmentName(segmentName).getTableName();
- URI segmentFileURI = ControllerConf.getUriFromPath(
- StringUtil.join("/", provider.getBaseDataDirURI().toString(), rawTableName, segmentName));
- PinotFS pinotFS = PinotFSFactory.create(provider.getBaseDataDirURI().getScheme());
+ URI segmentFileURI =
+ URIUtils.getUri(provider.getBaseDataDirURI().toString(), rawTableName, URIUtils.encode(segmentName));
+ PinotFS pinotFS = PinotFSFactory.create(segmentFileURI.getScheme());
// Multiple threads can reach this point at the same time, if the following scenario happens
// The server that was asked to commit did so very slowly (due to network speeds). Meanwhile the FSM in
// SegmentCompletionManager timed out, and allowed another server to commit, which did so very quickly (somehow
@@ -428,8 +426,8 @@ public class LLCSegmentCompletionHandlers {
try {
Preconditions.checkState(tempMetadataDir.mkdirs(), "Failed to create directory: %s", tempMetadataDirStr);
// Extract metadata.properties from the metadataFiles.
- if (!extractMetadataFromInputField(metadataFiles, tempMetadataDirStr,
- V1Constants.MetadataKeys.METADATA_FILE_NAME, segmentNameStr)) {
+ if (!extractMetadataFromInputField(metadataFiles, tempMetadataDirStr, V1Constants.MetadataKeys.METADATA_FILE_NAME,
+ segmentNameStr)) {
return null;
}
// Extract creation.meta from the metadataFiles.
@@ -486,7 +484,7 @@ public class LLCSegmentCompletionHandlers {
File tempSegmentDataDir = new File(tempSegmentDataDirStr);
File segDstFile = new File(StringUtil.join("/", tempSegmentDataDirStr, segmentNameStr));
// Use PinotFS to copy the segment file to local fs for metadata extraction.
- PinotFS pinotFS = PinotFSFactory.create(ControllerConf.getUriFromPath(_controllerConf.getDataDir()).getScheme());
+ PinotFS pinotFS = PinotFSFactory.create(URIUtils.getUri(_controllerConf.getDataDir()).getScheme());
try {
Preconditions.checkState(tempSegmentDataDir.mkdirs(), "Failed to create directory: %s", tempSegmentDataDir);
pinotFS.copyToLocalFile(segmentLocation, segDstFile);
@@ -575,8 +573,7 @@ public class LLCSegmentCompletionHandlers {
// See PinotLLCRealtimeSegmentManager.commitSegmentFile().
// TODO: move tmp file logic into SegmentCompletionUtils.
String uniqueSegmentFileName = SegmentCompletionUtils.generateSegmentFileName(segmentName);
- URI segmentFileURI = ControllerConf.getUriFromPath(
- StringUtil.join("/", provider.getBaseDataDirURI().toString(), rawTableName, uniqueSegmentFileName));
+ URI segmentFileURI = URIUtils.getUri(provider.getBaseDataDirURI().toString(), rawTableName, uniqueSegmentFileName);
PinotFS pinotFS = PinotFSFactory.create(provider.getBaseDataDirURI().getScheme());
pinotFS.copyFromLocalFile(localTmpFile, segmentFileURI);
return segmentFileURI;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 15cd6d5..6e6ecb0 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -24,8 +24,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -54,6 +52,7 @@ import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.JsonUtils;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
+import org.apache.pinot.common.utils.URIUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -138,7 +137,7 @@ public class PinotSegmentRestletResource {
@ApiParam(value = "Name of segment", required = true) @PathParam("segmentName") @Encoded String segmentName,
@ApiParam(value = "enable|disable|drop", required = false) @QueryParam("state") String stateStr,
@ApiParam(value = "realtime|offline", required = false) @QueryParam("type") String tableTypeStr) {
- segmentName = checkGetEncodedParam(segmentName);
+ segmentName = URIUtils.decode(segmentName);
// segmentName will never be null,otherwise we would reach the method toggleStateOrListMetadataForAllSegments()
CommonConstants.Helix.TableType tableType = Constants.validateTableType(tableTypeStr);
StateType stateType = Constants.validateState(stateStr);
@@ -162,15 +161,6 @@ public class PinotSegmentRestletResource {
}
}
- private String checkGetEncodedParam(String encoded) {
- try {
- return URLDecoder.decode(encoded, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- String errStr = "Could not decode parameter '" + encoded + "'";
- throw new ControllerApplicationException(LOGGER, errStr, Response.Status.BAD_REQUEST);
- }
- }
-
@GET
@Path("tables/{tableName}/segments/metadata")
@Produces(MediaType.APPLICATION_JSON)
@@ -193,7 +183,7 @@ public class PinotSegmentRestletResource {
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "Name of segment", required = true) @PathParam("segmentName") @Encoded String segmentName,
@ApiParam(value = "realtime|offline", required = false) @QueryParam("type") String tableTypeStr) {
- segmentName = checkGetEncodedParam(segmentName);
+ segmentName = URIUtils.decode(segmentName);
CommonConstants.Helix.TableType tableType = Constants.validateTableType(tableTypeStr);
return listSegmentMetadataInternal(tableName, segmentName, tableType);
}
@@ -229,7 +219,7 @@ public class PinotSegmentRestletResource {
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "Name of segment", required = true) @PathParam("segmentName") @Encoded String segmentName,
@ApiParam(value = "realtime|offline") @QueryParam("type") String tableTypeStr) {
- segmentName = checkGetEncodedParam(segmentName);
+ segmentName = URIUtils.decode(segmentName);
CommonConstants.Helix.TableType tableType = Constants.validateTableType(tableTypeStr);
return reloadSegmentForTable(tableName, segmentName, tableType);
}
@@ -254,7 +244,7 @@ public class PinotSegmentRestletResource {
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
@ApiParam(value = "Name of segment", required = true) @PathParam("segmentName") @Encoded String segmentName,
@ApiParam(value = "realtime|offline") @QueryParam("type") String tableTypeStr) {
- segmentName = checkGetEncodedParam(segmentName);
+ segmentName = URIUtils.decode(segmentName);
CommonConstants.Helix.TableType tableType = Constants.validateTableType(tableTypeStr);
return reloadSegmentForTable(tableName, segmentName, tableType);
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
index 5259d9f..bbb34ac 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadRestletResource.java
@@ -28,11 +28,8 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
import java.net.InetAddress;
import java.net.URI;
-import java.net.URLDecoder;
-import java.net.URLEncoder;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
@@ -57,7 +54,6 @@ import javax.ws.rs.core.Response;
import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
-import org.apache.helix.ZNRecord;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.metrics.ControllerMeter;
@@ -67,7 +63,7 @@ import org.apache.pinot.common.segment.fetcher.SegmentFetcherFactory;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.JsonUtils;
-import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.ControllerLeadershipManager;
@@ -185,13 +181,8 @@ public class PinotSegmentUploadRestletResource {
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
}
- try {
- segmentName = URLDecoder.decode(segmentName, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- String errStr = "Could not decode segment name '" + segmentName + "'";
- throw new ControllerApplicationException(LOGGER, errStr, Response.Status.BAD_REQUEST);
- }
- final File dataFile = new File(provider.getBaseDataDir(), StringUtil.join("/", tableName, segmentName));
+ segmentName = URIUtils.decode(segmentName);
+ File dataFile = new File(provider.getBaseDataDir(), String.join(File.separator, tableName, segmentName));
if (!dataFile.exists()) {
throw new ControllerApplicationException(LOGGER,
"Segment " + segmentName + " or table " + tableName + " not found", Response.Status.NOT_FOUND);
@@ -214,12 +205,7 @@ public class PinotSegmentUploadRestletResource {
if (tableType == null) {
throw new ControllerApplicationException(LOGGER, "Table type must not be null", Response.Status.BAD_REQUEST);
}
- try {
- segmentName = URLDecoder.decode(segmentName, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- String errStr = "Could not decode segment name '" + segmentName + "'";
- throw new ControllerApplicationException(LOGGER, errStr, Response.Status.BAD_REQUEST);
- }
+ segmentName = URIUtils.decode(segmentName);
PinotSegmentRestletResource
.toggleStateInternal(tableName, StateType.DROP, tableType, segmentName, _pinotHelixResourceManager);
@@ -267,8 +253,6 @@ public class PinotSegmentUploadRestletResource {
File tempEncryptedFile = null;
File tempDecryptedFile = null;
File tempSegmentDir = null;
- SegmentMetadata segmentMetadata;
- String zkDownloadUri = null;
try {
FileUploadPathProvider provider = new FileUploadPathProvider(_controllerConf);
String tempFileName = TMP_DIR_PREFIX + System.nanoTime();
@@ -288,6 +272,7 @@ public class PinotSegmentUploadRestletResource {
// TODO: Change when metadata upload added
String metadataProviderClass = DefaultMetadataExtractor.class.getName();
+ SegmentMetadata segmentMetadata;
switch (uploadType) {
case URI:
segmentMetadata =
@@ -304,19 +289,20 @@ public class PinotSegmentUploadRestletResource {
}
String rawTableName = segmentMetadata.getTableName();
+ String segmentName = segmentMetadata.getName();
+ String zkDownloadUri;
// This boolean is here for V1 segment upload, where we keep the segment in the downloadURI sent in the header.
// We will deprecate this behavior eventually.
if (!moveSegmentToFinalLocation) {
LOGGER.info("Setting zkDownloadUri to {} for segment {} of table {}, skipping move", currentSegmentLocationURI,
- segmentMetadata.getName(), rawTableName);
+ segmentName, rawTableName);
zkDownloadUri = currentSegmentLocationURI;
} else {
- zkDownloadUri = getZkDownloadURIForSegmentUpload(rawTableName, segmentMetadata, provider);
+ zkDownloadUri = getZkDownloadURIForSegmentUpload(provider, rawTableName, segmentName);
}
String clientAddress = InetAddress.getByName(request.getRemoteAddr()).getHostName();
- String segmentName = segmentMetadata.getName();
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(rawTableName);
LOGGER
.info("Processing upload request for segment: {} of table: {} from client: {}", segmentName, offlineTableName,
@@ -332,8 +318,7 @@ public class PinotSegmentUploadRestletResource {
completeZkOperations(enableParallelPushProtection, headers, tempEncryptedFile, provider, rawTableName,
segmentMetadata, segmentName, zkDownloadUri, moveSegmentToFinalLocation, segmentValidatorResponse);
- return new SuccessResponse(
- "Successfully uploaded segment: " + segmentMetadata.getName() + " of table: " + rawTableName);
+ return new SuccessResponse("Successfully uploaded segment: " + segmentName + " of table: " + rawTableName);
} catch (WebApplicationException e) {
throw e;
} catch (Exception e) {
@@ -347,17 +332,16 @@ public class PinotSegmentUploadRestletResource {
}
}
- private String getZkDownloadURIForSegmentUpload(String rawTableName, SegmentMetadata segmentMetadata,
- FileUploadPathProvider provider)
- throws UnsupportedEncodingException {
- if (provider.getBaseDataDirURI().getScheme().equalsIgnoreCase(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME)) {
- return ControllerConf.constructDownloadUrl(rawTableName, segmentMetadata.getName(), provider.getVip());
+ private String getZkDownloadURIForSegmentUpload(FileUploadPathProvider provider, String rawTableName,
+ String segmentName) {
+ URI baseDataDirURI = provider.getBaseDataDirURI();
+ if (baseDataDirURI.getScheme().equalsIgnoreCase(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME)) {
+ return URIUtils.constructDownloadUrl(provider.getVip(), rawTableName, segmentName);
} else {
// Receiving .tar.gz segment upload for pluggable storage
- LOGGER.info("Using configured data dir {} for segment {} of table {}", _controllerConf.getDataDir(),
- segmentMetadata.getName(), rawTableName);
- return StringUtil.join("/", provider.getBaseDataDirURI().toString(), rawTableName,
- URLEncoder.encode(segmentMetadata.getName(), "UTF-8"));
+ LOGGER.info("Using configured data dir {} for segment {} of table {}", _controllerConf.getDataDir(), segmentName,
+ rawTableName);
+ return URIUtils.constructDownloadUrl(baseDataDirURI.toString(), rawTableName, segmentName);
}
}
@@ -388,12 +372,11 @@ public class PinotSegmentUploadRestletResource {
}
private void completeZkOperations(boolean enableParallelPushProtection, HttpHeaders headers, File tempDecryptedFile,
- FileUploadPathProvider provider, String rawTableName, SegmentMetadata segmentMetadata, String segmentName, String zkDownloadURI,
- boolean moveSegmentToFinalLocation, SegmentValidatorResponse segmentValidatorResponse)
+ FileUploadPathProvider provider, String rawTableName, SegmentMetadata segmentMetadata, String segmentName,
+ String zkDownloadURI, boolean moveSegmentToFinalLocation, SegmentValidatorResponse segmentValidatorResponse)
throws Exception {
- String finalSegmentPath = StringUtil
- .join("/", provider.getBaseDataDirURI().toString(), rawTableName, URLEncoder.encode(segmentName, "UTF-8"));
- URI finalSegmentLocationURI = new URI(finalSegmentPath);
+ URI finalSegmentLocationURI =
+ URIUtils.getUri(provider.getBaseDataDirURI().toString(), rawTableName, URIUtils.encode(segmentName));
ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, _controllerConf, _controllerMetrics);
zkOperator.completeSegmentOperations(rawTableName, segmentMetadata, finalSegmentLocationURI, tempDecryptedFile,
enableParallelPushProtection, headers, zkDownloadURI, moveSegmentToFinalLocation, segmentValidatorResponse);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
index 584a558..b3c4913 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
@@ -18,11 +18,10 @@
*/
package org.apache.pinot.controller.helix;
-import java.io.IOException;
-import java.net.URLEncoder;
import org.apache.avro.reflect.Nullable;
import org.apache.commons.lang.StringUtils;
import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.common.utils.URIUtils;
/**
@@ -206,20 +205,17 @@ public class ControllerRequestURLBuilder {
System.out.println(ControllerRequestURLBuilder.baseUrl("localhost:8089").forInstanceCreate());
}
- public String forSegmentDownload(String tableName, String segmentName)
- throws IOException {
- return StringUtil
- .join("/", StringUtils.chomp(_baseUrl, "/"), "segments", tableName, URLEncoder.encode(segmentName, "UTF-8"));
+ public String forSegmentDownload(String tableName, String segmentName) {
+ return URIUtils.constructDownloadUrl(StringUtils.chomp(_baseUrl, "/"), tableName, segmentName);
}
public String forSegmentDelete(String resourceName, String segmentName) {
return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "datafiles", resourceName, segmentName);
}
- public String forSegmentDeleteAPI(String tableName, String segmentName, String tableType)
- throws Exception {
- return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "segments", tableName,
- URLEncoder.encode(segmentName, "UTF-8") + "?type=" + tableType);
+ public String forSegmentDeleteAPI(String tableName, String segmentName, String tableType) {
+ return URIUtils.getPath(StringUtils.chomp(_baseUrl, "/"), "segments", tableName, URIUtils.encode(segmentName))
+ + "?type=" + tableType;
}
public String forSegmentDeleteAllAPI(String tableName, String tableType)
@@ -244,8 +240,9 @@ public class ControllerRequestURLBuilder {
public String forDeleteSegmentWithGetAPI(String tableName, String segmentName, String tableType)
throws Exception {
- return StringUtil.join("/", StringUtils.chomp(_baseUrl, "/"), "tables", tableName, "segments",
- URLEncoder.encode(segmentName, "UTF-8") + "?state=drop&" + "type=" + tableType);
+ return URIUtils
+ .getPath(StringUtils.chomp(_baseUrl, "/"), "tables", tableName, "segments", URIUtils.encode(segmentName))
+ + "?state=drop&type=" + tableType;
}
public String forDeleteAllSegmentsWithTypeWithGetAPI(String tableName, String tableType) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index 46c84c0..b2ab8b3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.controller.helix.core;
-import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
@@ -41,8 +40,7 @@ import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.utils.SegmentName;
-import org.apache.pinot.common.utils.StringUtil;
-import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.filesystem.PinotFS;
import org.apache.pinot.filesystem.PinotFSFactory;
import org.joda.time.DateTime;
@@ -174,14 +172,9 @@ public class SegmentDeletionManager {
protected void removeSegmentFromStore(String tableNameWithType, String segmentId) {
final String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
if (_dataDir != null) {
- URI fileToMoveURI;
- PinotFS pinotFS;
- URI dataDirURI = ControllerConf.getUriFromPath(_dataDir);
- fileToMoveURI = ControllerConf.constructSegmentLocation(_dataDir, rawTableName, segmentId);
- URI deletedSegmentDestURI = ControllerConf
- .constructSegmentLocation(StringUtil.join(File.separator, _dataDir, DELETED_SEGMENTS), rawTableName,
- segmentId);
- pinotFS = PinotFSFactory.create(dataDirURI.getScheme());
+ URI fileToMoveURI = URIUtils.getUri(_dataDir, rawTableName, URIUtils.encode(segmentId));
+ URI deletedSegmentDestURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS, rawTableName, URIUtils.encode(segmentId));
+ PinotFS pinotFS = PinotFSFactory.create(fileToMoveURI.getScheme());
try {
if (pinotFS.exists(fileToMoveURI)) {
@@ -216,9 +209,8 @@ public class SegmentDeletionManager {
*/
public void removeAgedDeletedSegments(int retentionInDays) {
if (_dataDir != null) {
- URI dataDirURI = ControllerConf.getUriFromPath(_dataDir);
- URI deletedDirURI = ControllerConf.getUriFromPath(StringUtil.join(File.separator, _dataDir, DELETED_SEGMENTS));
- PinotFS pinotFS = PinotFSFactory.create(dataDirURI.getScheme());
+ URI deletedDirURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS);
+ PinotFS pinotFS = PinotFSFactory.create(deletedDirURI.getScheme());
try {
// Check that the directory for deleted segments exists.
@@ -234,12 +226,12 @@ public class SegmentDeletionManager {
}
for (String tableNameDir : tableNameDirs) {
- URI tableNameURI = ControllerConf.getUriFromPath(tableNameDir);
+ URI tableNameURI = URIUtils.getUri(tableNameDir);
// Get files that are aged
final String[] targetFiles = pinotFS.listFiles(tableNameURI, false);
int numFilesDeleted = 0;
for (String targetFile : targetFiles) {
- URI targetURI = ControllerConf.getUriFromPath(targetFile);
+ URI targetURI = URIUtils.getUri(targetFile);
Date dateToDelete = DateTime.now().minusDays(retentionInDays).toDate();
if (pinotFS.lastModified(targetURI) < dateToDelete.getTime()) {
if (!pinotFS.delete(targetURI, true)) {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index c833261..22239f3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -62,7 +62,7 @@ import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.SegmentName;
-import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.common.utils.retry.RetryPolicies;
import org.apache.pinot.controller.ControllerConf;
@@ -148,7 +148,6 @@ public class PinotLLCRealtimeSegmentManager {
_controllerLeadershipManager = controllerLeadershipManager;
}
-
public boolean getIsSplitCommitEnabled() {
return _controllerConf.getAcceptSplitCommit();
}
@@ -335,11 +334,10 @@ public class PinotLLCRealtimeSegmentManager {
}
String segmentName = committingSegmentDescriptor.getSegmentName();
String segmentLocation = committingSegmentDescriptor.getSegmentLocation();
- URI segmentFileURI = ControllerConf.getUriFromPath(segmentLocation);
- URI baseDirURI = ControllerConf.getUriFromPath(_controllerConf.getDataDir());
- URI tableDirURI = ControllerConf.getUriFromPath(StringUtil.join("/", _controllerConf.getDataDir(), tableName));
- URI uriToMoveTo = ControllerConf.getUriFromPath(StringUtil.join("/", tableDirURI.toString(), segmentName));
- PinotFS pinotFS = PinotFSFactory.create(baseDirURI.getScheme());
+ URI segmentFileURI = URIUtils.getUri(segmentLocation);
+ URI tableDirURI = URIUtils.getUri(_controllerConf.getDataDir(), tableName);
+ URI uriToMoveTo = URIUtils.getUri(_controllerConf.getDataDir(), tableName, URIUtils.encode(segmentName));
+ PinotFS pinotFS = PinotFSFactory.create(tableDirURI.getScheme());
if (!isConnected() || !isLeader()) {
// We can potentially log a different value than what we saw ....
@@ -452,7 +450,7 @@ public class PinotLLCRealtimeSegmentManager {
// Step-1
boolean success = updateOldSegmentMetadataZNRecord(realtimeTableName, committingLLCSegmentName, nextOffset,
- committingSegmentDescriptor);
+ committingSegmentDescriptor);
if (!success) {
return false;
}
@@ -483,7 +481,7 @@ public class PinotLLCRealtimeSegmentManager {
} catch (Exception e) {
LOGGER.error("Caught exception when updating ideal state for {}", committingSegmentNameStr, e);
return false;
- } finally {
+ } finally {
lock.unlock();
}
@@ -502,8 +500,7 @@ public class PinotLLCRealtimeSegmentManager {
* @return
*/
protected boolean updateOldSegmentMetadataZNRecord(String realtimeTableName, LLCSegmentName committingLLCSegmentName,
- long nextOffset,
- CommittingSegmentDescriptor committingSegmentDescriptor) {
+ long nextOffset, CommittingSegmentDescriptor committingSegmentDescriptor) {
String committingSegmentNameStr = committingLLCSegmentName.getSegmentName();
Stat stat = new Stat();
@@ -516,8 +513,8 @@ public class PinotLLCRealtimeSegmentManager {
return false;
}
if (committingSegmentDescriptor.getSegmentMetadata() == null) {
- LOGGER.error("No segment metadata found in descriptor for committing segment {} for table {}", committingLLCSegmentName,
- realtimeTableName);
+ LOGGER.error("No segment metadata found in descriptor for committing segment {} for table {}",
+ committingLLCSegmentName, realtimeTableName);
return false;
}
SegmentMetadataImpl segmentMetadata = committingSegmentDescriptor.getSegmentMetadata();
@@ -527,7 +524,7 @@ public class PinotLLCRealtimeSegmentManager {
committingSegmentMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
committingSegmentMetadata.setDownloadUrl(
- ControllerConf.constructDownloadUrl(rawTableName, committingSegmentNameStr, _controllerConf.generateVipUrl()));
+ URIUtils.constructDownloadUrl(_controllerConf.generateVipUrl(), rawTableName, committingSegmentNameStr));
committingSegmentMetadata.setCrc(Long.valueOf(segmentMetadata.getCrc()));
committingSegmentMetadata.setStartTime(segmentMetadata.getTimeInterval().getStartMillis());
committingSegmentMetadata.setEndTime(segmentMetadata.getTimeInterval().getEndMillis());
@@ -1010,8 +1007,9 @@ public class PinotLLCRealtimeSegmentManager {
LLCRealtimeSegmentZKMetadata metadata = getRealtimeSegmentZKMetadata(tableNameWithType, segmentId, stat);
long metadataUpdateTime = stat.getMtime();
if (now > metadataUpdateTime + MAX_SEGMENT_COMPLETION_TIME_MILLIS) {
- LOGGER.info("Segment:{}, Now:{}, metadataUpdateTime:{}, Exceeded MAX_SEGMENT_COMPLETION_TIME_MILLIS:{}",
- segmentId, now, metadataUpdateTime, MAX_SEGMENT_COMPLETION_TIME_MILLIS);
+ LOGGER
+ .info("Segment:{}, Now:{}, metadataUpdateTime:{}, Exceeded MAX_SEGMENT_COMPLETION_TIME_MILLIS:{}", segmentId,
+ now, metadataUpdateTime, MAX_SEGMENT_COMPLETION_TIME_MILLIS);
return true;
}
return false;
@@ -1340,8 +1338,8 @@ public class PinotLLCRealtimeSegmentManager {
Map<String, String> stateMap = idealState.getInstanceStateMap(newSegmentId);
if (stateMap == null) {
for (String instance : newSegmentInstances) {
- idealState
- .setPartitionState(newSegmentId, instance, PinotHelixSegmentOnlineOfflineStateModelGenerator.CONSUMING_STATE);
+ idealState.setPartitionState(newSegmentId, instance,
+ PinotHelixSegmentOnlineOfflineStateModelGenerator.CONSUMING_STATE);
}
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index b05f9bf..486439c 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -29,6 +29,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.ControllerLeadershipManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
@@ -40,7 +41,6 @@ import org.testng.annotations.Test;
import static org.apache.pinot.common.protocols.SegmentCompletionProtocol.ControllerResponseStatus;
import static org.apache.pinot.common.protocols.SegmentCompletionProtocol.Request;
-import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -1165,9 +1165,8 @@ public class SegmentCompletionTest {
public boolean commitSegmentMetadata(String rawTableName, CommittingSegmentDescriptor committingSegmentDescriptor) {
_segmentMetadata.setStatus(CommonConstants.Segment.Realtime.Status.DONE);
_segmentMetadata.setEndOffset(committingSegmentDescriptor.getNextOffset());
- _segmentMetadata.setDownloadUrl(ControllerConf
- .constructDownloadUrl(rawTableName, committingSegmentDescriptor.getSegmentName(),
- CONTROLLER_CONF.generateVipUrl()));
+ _segmentMetadata.setDownloadUrl(URIUtils.constructDownloadUrl(CONTROLLER_CONF.generateVipUrl(), rawTableName,
+ committingSegmentDescriptor.getSegmentName()));
_segmentMetadata.setEndTime(_segmentCompletionMgr.getCurrentTimeMs());
return true;
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PinotURIUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PinotURIUploadIntegrationTest.java
index 65cf969..9debc81 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PinotURIUploadIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PinotURIUploadIntegrationTest.java
@@ -23,7 +23,6 @@ import com.google.common.util.concurrent.MoreExecutors;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
-import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -111,7 +110,8 @@ public class PinotURIUploadIntegrationTest extends BaseClusterIntegrationTestSet
}
}
- private File generateRandomSegment(String segmentName, int rowCount) throws Exception {
+ private File generateRandomSegment(String segmentName, int rowCount)
+ throws Exception {
ThreadLocalRandom random = ThreadLocalRandom.current();
Schema schema = new Schema.Parser()
.parse(new File(TestUtils.getFileFromResourceUrl(getClass().getClassLoader().getResource("dummy.avsc"))));
@@ -267,12 +267,12 @@ public class PinotURIUploadIntegrationTest extends BaseClusterIntegrationTestSet
}
}
- private List<String> getAllSegments(String tablename)
+ private List<String> getAllSegments(String tableName)
throws IOException {
List<String> allSegments = new ArrayList<>();
HttpHost controllerHttpHost = new HttpHost("localhost", _controllerPort);
HttpClient controllerClient = new DefaultHttpClient();
- HttpGet req = new HttpGet("/segments/" + URLEncoder.encode(tablename, "UTF-8"));
+ HttpGet req = new HttpGet("/segments/" + tableName);
HttpResponse res = controllerClient.execute(controllerHttpHost, req);
try {
if (res.getStatusLine().getStatusCode() != 200) {
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/query/comparison/ClusterStarter.java b/pinot-tools/src/main/java/org/apache/pinot/tools/query/comparison/ClusterStarter.java
index 18dedc4..bc7eb7a 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/query/comparison/ClusterStarter.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/query/comparison/ClusterStarter.java
@@ -25,11 +25,11 @@ import java.io.InputStreamReader;
import java.net.SocketException;
import java.net.URL;
import java.net.URLConnection;
-import java.net.URLEncoder;
import java.net.UnknownHostException;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.utils.CommonConstants;
import org.apache.pinot.common.utils.NetUtil;
+import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.controller.helix.ControllerRequestURLBuilder;
import org.apache.pinot.tools.admin.command.AddTableCommand;
import org.apache.pinot.tools.admin.command.CreateSegmentCommand;
@@ -254,7 +254,7 @@ public class ClusterStarter {
public int perfQuery(String query)
throws Exception {
LOGGER.debug("Running perf query on Pinot Cluster");
- String encodedQuery = URLEncoder.encode(query, "UTF-8");
+ String encodedQuery = URIUtils.encode(query);
String brokerUrl = _perfUrl + encodedQuery;
LOGGER.info("Executing command: " + brokerUrl);
URLConnection conn = new URL(brokerUrl).openConnection();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org