You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sp...@apache.org on 2016/08/11 16:14:16 UTC
hive git commit: HIVE-14270: Write temporary data to HDFS when doing
inserts on tables located on S3 (Sergio Pena, reviewed by Ashutosh Chauhan,
Lefty Leverenz)
Repository: hive
Updated Branches:
refs/heads/master 546fe8775 -> 1bdbdc4dd
HIVE-14270: Write temporary data to HDFS when doing inserts on tables located on S3 (Sergio Pena, reviewed by Ashutosh Chauhan, Lefty Leverenz)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1bdbdc4d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1bdbdc4d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1bdbdc4d
Branch: refs/heads/master
Commit: 1bdbdc4ddfc4e66ac06a99ce930a87cf52307440
Parents: 546fe87
Author: Sergio Pena <se...@cloudera.com>
Authored: Wed Jul 27 10:34:08 2016 -0500
Committer: Sergio Pena <se...@cloudera.com>
Committed: Thu Aug 11 11:12:20 2016 -0500
----------------------------------------------------------------------
.../hadoop/hive/common/BlobStorageUtils.java | 54 +++++++++++
.../org/apache/hadoop/hive/conf/HiveConf.java | 11 ++-
.../hive/common/TestBlobStorageUtils.java | 95 ++++++++++++++++++++
.../java/org/apache/hadoop/hive/ql/Context.java | 20 +++++
.../hive/ql/optimizer/GenMapRedUtils.java | 9 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 9 +-
.../apache/hadoop/hive/ql/exec/TestContext.java | 63 +++++++++++++
7 files changed, 253 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/1bdbdc4d/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java b/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java
new file mode 100644
index 0000000..6ca35e2
--- /dev/null
+++ b/common/src/java/org/apache/hadoop/hive/common/BlobStorageUtils.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import java.util.Collection;
+
+/**
+ * Utilities for different blob (object) storage systems
+ */
+public class BlobStorageUtils {
+ private static final boolean DISABLE_BLOBSTORAGE_AS_SCRATCHDIR = false;
+
+ public static boolean isBlobStoragePath(final Configuration conf, final Path path) {
+ return (path == null) ? false : isBlobStorageScheme(conf, path.toUri().getScheme());
+ }
+
+ public static boolean isBlobStorageFileSystem(final Configuration conf, final FileSystem fs) {
+ return (fs == null) ? false : isBlobStorageScheme(conf, fs.getScheme());
+ }
+
+ public static boolean isBlobStorageScheme(final Configuration conf, final String scheme) {
+ Collection<String> supportedBlobStoreSchemes =
+ conf.getStringCollection(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES.varname);
+
+ return supportedBlobStoreSchemes.contains(scheme);
+ }
+
+ public static boolean isBlobStorageAsScratchDir(final Configuration conf) {
+ return conf.getBoolean(
+ HiveConf.ConfVars.HIVE_BLOBSTORE_USE_BLOBSTORE_AS_SCRATCHDIR.varname,
+ DISABLE_BLOBSTORAGE_AS_SCRATCHDIR
+ );
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1bdbdc4d/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 70816bd..3e9f6ec 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3084,7 +3084,16 @@ public class HiveConf extends Configuration {
HIVE_QUERY_TIMEOUT_SECONDS("hive.query.timeout.seconds", "0s",
new TimeValidator(TimeUnit.SECONDS),
"Timeout for Running Query in seconds. A nonpositive value means infinite. " +
- "If the query timeout is also set by thrift API call, the smaller one will be taken.");
+ "If the query timeout is also set by thrift API call, the smaller one will be taken."),
+
+ /* BLOBSTORE section */
+
+ HIVE_BLOBSTORE_SUPPORTED_SCHEMES("hive.blobstore.supported.schemes", "s3,s3a,s3n",
+ "Comma-separated list of supported blobstore schemes."),
+
+ HIVE_BLOBSTORE_USE_BLOBSTORE_AS_SCRATCHDIR("hive.blobstore.use.blobstore.as.scratchdir", false,
+ "Enable the use of scratch directories directly on blob storage systems (it may cause performance penalties).");
+
public final String varname;
private final String altName;
http://git-wip-us.apache.org/repos/asf/hive/blob/1bdbdc4d/common/src/test/org/apache/hadoop/hive/common/TestBlobStorageUtils.java
----------------------------------------------------------------------
diff --git a/common/src/test/org/apache/hadoop/hive/common/TestBlobStorageUtils.java b/common/src/test/org/apache/hadoop/hive/common/TestBlobStorageUtils.java
new file mode 100644
index 0000000..84a0d86
--- /dev/null
+++ b/common/src/test/org/apache/hadoop/hive/common/TestBlobStorageUtils.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static org.apache.hadoop.hive.common.BlobStorageUtils.*;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+public class TestBlobStorageUtils {
+ private static final Configuration conf = new Configuration();
+
+ @Before
+ public void setUp() {
+ conf.set(HiveConf.ConfVars.HIVE_BLOBSTORE_SUPPORTED_SCHEMES.varname, "s3a,swift");
+ conf.setBoolean(HiveConf.ConfVars.HIVE_BLOBSTORE_USE_BLOBSTORE_AS_SCRATCHDIR.varname, false);
+ }
+
+ @Test
+ public void testValidAndInvalidPaths() throws IOException {
+ // Valid paths
+ assertTrue(isBlobStoragePath(conf, new Path("s3a://bucket/path")));
+ assertTrue(isBlobStoragePath(conf, new Path("swift://bucket/path")));
+
+ // Invalid paths
+ assertFalse(isBlobStoragePath(conf, new Path("/tmp/a-path")));
+ assertFalse(isBlobStoragePath(conf, new Path("s3fs://tmp/file")));
+ assertFalse(isBlobStoragePath(conf, null));
+ assertFalse(isBlobStorageFileSystem(conf, null));
+ assertFalse(isBlobStoragePath(conf, new Path(URI.create(""))));
+ }
+
+ @Test
+ public void testValidAndInvalidFileSystems() {
+ FileSystem fs = mock(FileSystem.class);
+
+ /* Valid FileSystem schemes */
+
+ doReturn("s3a").when(fs).getScheme();
+ assertTrue(isBlobStorageFileSystem(conf, fs));
+
+ doReturn("swift").when(fs).getScheme();
+ assertTrue(isBlobStorageFileSystem(conf, fs));
+
+ /* Invalid FileSystem schemes */
+
+ doReturn("hdfs").when(fs).getScheme();
+ assertFalse(isBlobStorageFileSystem(conf, fs));
+
+ doReturn("").when(fs).getScheme();
+ assertFalse(isBlobStorageFileSystem(conf, fs));
+
+ assertFalse(isBlobStorageFileSystem(conf, null));
+ }
+
+ @Test
+ public void testValidAndInvalidSchemes() {
+ // Valid schemes
+ assertTrue(isBlobStorageScheme(conf, "s3a"));
+ assertTrue(isBlobStorageScheme(conf, "swift"));
+
+ // Invalid schemes
+ assertFalse(isBlobStorageScheme(conf, "hdfs"));
+ assertFalse(isBlobStorageScheme(conf, ""));
+ assertFalse(isBlobStorageScheme(conf, null));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/1bdbdc4d/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index 89893eb..3785b1e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.BlobStorageUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.TaskRunner;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
@@ -351,6 +352,25 @@ public class Context {
}
}
+ /**
+ * Create a temporary directory depending of the path specified.
+ * - If path is an Object store filesystem, then use the default MR scratch directory (HDFS)
+ * - If path is on HDFS, then create a staging directory inside the path
+ *
+ * @param path Path used to verify the Filesystem to use for temporary directory
+ * @return A path to the new temporary directory
+ */
+ public Path getTempDirForPath(Path path) {
+ if (BlobStorageUtils.isBlobStoragePath(conf, path) && !BlobStorageUtils.isBlobStorageAsScratchDir(conf)) {
+ // For better write performance, we use HDFS for temporary data when object store is used.
+ // Note that the scratch directory configuration variable must use HDFS or any other non-blobstorage system
+ // to take advantage of this performance.
+ return getMRTmpPath();
+ } else {
+ return getExtTmpPathRelTo(path);
+ }
+ }
+
private Path getExternalScratchDir(URI extURI) {
return getStagingDir(new Path(extURI.getScheme(), extURI.getAuthority(), extURI.getPath()), !explain);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1bdbdc4d/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
index 5bd7886..cea99e1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
@@ -36,6 +36,7 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.BlobStorageUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.Warehouse;
@@ -1796,15 +1797,17 @@ public final class GenMapRedUtils {
Path dest = null;
if (chDir) {
- dest = fsOp.getConf().getFinalDirName();
+ FileSinkDesc fileSinkDesc = fsOp.getConf();
+ dest = fileSinkDesc.getFinalDirName();
// generate the temporary file
// it must be on the same file system as the current destination
Context baseCtx = parseCtx.getContext();
- Path tmpDir = baseCtx.getExternalTmpPath(dest);
+ // Create the required temporary file in the HDFS location if the destination
+ // path of the FileSinkOperator table is a blobstore path.
+ Path tmpDir = baseCtx.getTempDirForPath(fileSinkDesc.getDestPath());
- FileSinkDesc fileSinkDesc = fsOp.getConf();
// Change all the linked file sink descriptors
if (fileSinkDesc.isLinkedFileSink()) {
for (FileSinkDesc fsConf:fileSinkDesc.getLinkedFileSinkDesc()) {
http://git-wip-us.apache.org/repos/asf/hive/blob/1bdbdc4d/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index a01a7bd..6758741 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hive.common.BlobStorageUtils;
import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.ObjectPair;
@@ -6642,7 +6643,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
if (isNonNativeTable) {
queryTmpdir = dest_path;
} else {
- queryTmpdir = ctx.getExtTmpPathRelTo(dest_path);
+ queryTmpdir = ctx.getTempDirForPath(dest_path);
}
if (dpCtx != null) {
// set the root of the temporary path where dynamic partition columns will populate
@@ -6759,7 +6760,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
dest_path = new Path(tabPath.toUri().getScheme(), tabPath.toUri()
.getAuthority(), partPath.toUri().getPath());
- queryTmpdir = ctx.getExternalTmpPath(dest_path);
+ queryTmpdir = ctx.getTempDirForPath(dest_path);
table_desc = Utilities.getTableDesc(dest_tab);
// Add sorting/bucketing if needed
@@ -6807,7 +6808,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
try {
Path qPath = FileUtils.makeQualified(dest_path, conf);
- queryTmpdir = ctx.getExtTmpPathRelTo(qPath);
+ queryTmpdir = ctx.getTempDirForPath(qPath);
} catch (Exception e) {
throw new SemanticException("Error creating temporary folder on: "
+ dest_path, e);
@@ -7015,7 +7016,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
fileSinkDesc.setStatsAggPrefix(fileSinkDesc.getDirName().toString());
if (!destTableIsMaterialization &&
HiveConf.getVar(conf, HIVESTATSDBCLASS).equalsIgnoreCase(StatDB.fs.name())) {
- String statsTmpLoc = ctx.getExtTmpPathRelTo(queryTmpdir).toString();
+ String statsTmpLoc = ctx.getTempDirForPath(dest_path).toString();
fileSinkDesc.setStatsTmpDir(statsTmpLoc);
LOG.debug("Set stats collection dir : " + statsTmpLoc);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1bdbdc4d/ql/src/test/org/apache/hadoop/hive/ql/exec/TestContext.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestContext.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestContext.java
new file mode 100644
index 0000000..4a4c240
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestContext.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.*;
+
+public class TestContext {
+ private static HiveConf conf = new HiveConf();
+
+ private Context context;
+
+ @Before
+ public void setUp() throws IOException {
+ /* Only called to create session directories used by the Context class */
+ SessionState.start(conf);
+ SessionState.detachSession();
+
+ context = new Context(conf);
+ }
+
+ @Test
+ public void testGetScratchDirectoriesForPaths() throws IOException {
+ Context spyContext = spy(context);
+
+ // When Object store paths are used, then getMRTmpPatch() is called to get a temporary
+ // directory on the default scratch diretory location (usually /temp)
+ Path mrTmpPath = new Path("hdfs://hostname/tmp/scratch");
+ doReturn(mrTmpPath).when(spyContext).getMRTmpPath();
+ assertEquals(mrTmpPath, spyContext.getTempDirForPath(new Path("s3a://bucket/dir")));
+
+ // When Non-Object store paths are used, then getExtTmpPathRelTo is called to get a temporary
+ // directory on the same path passed as a parameter
+ Path tmpPathRelTo = new Path("hdfs://hostname/user");
+ doReturn(tmpPathRelTo).when(spyContext).getExtTmpPathRelTo(any(Path.class));
+ assertEquals(tmpPathRelTo, spyContext.getTempDirForPath(new Path("/user")));
+ }
+}